Best Python code snippet using avocado_python
ozon_spider.py
Source:ozon_spider.py
1import logging2import scrapy3import json4import traceback5import re6from pprint import pprint7from wildsearch_crawler.tools import DeepDict, find_keys, find_value8from urllib.parse import quote9from .base_spider import BaseSpider10from wildsearch_crawler.settings import ERROR_TRACE_LEVEL11from wildsearch_crawler.db.ozon import CatalogModel, ItemModel, get_elements, Session, get_end_points_by_top_of_bush12logger = logging.getLogger('main')13class WildberriesSpider(BaseSpider):14 name = "oz"15 overwrite = False16 api_url = 'https://www.ozon.ru/api/composer-api.bx/page/json/v2?url='17 base_url = 'https://www.ozon.ru'18 custom_settings = {19 'AUTOTHROTTLE_ENABLED': True,20 'AUTOTHROTTLE_START_DELAY': 1.0,21 'AUTOTHROTTLE_TARGET_CONCURRENCY': 1.0,22 'ITEM_PIPELINES': {23 'wildsearch_crawler.ozon_pipelines.OzonGoodPipeline': 300,24 }25 }26 def convert_category_url_to_api(self, url):27 """Simple trick to get JSON with LOTS of data instead of HTML is to convert URL as follows:28 From /category/utyugi-10680/?layout_container=categoryMegapagination&layout_page_index=9&page=929 To https://www.ozon.ru/api/composer-api.bx/page/json/v2?url=%2Fcategory%2Futyugi-10680%2F%3Flayout_container%3DcategoryMegapagination%26layout_page_index%3D8%26page%3D830 """31 return f'{self.api_url}{quote(url)}'32 def convert_api_to_seo_url(self, url):33 return url.replace(self.api_url, self.base_url)34 def convert_seo_to_api_url(self, url):35 return url.replace(self.base_url, self.api_url)36 def add_pagination_params(self, url):37 """Trick to get first result page with pagination and 'nextPage' param"""38 return f'{url}?layout_container=categorySearchMegapagination&layout_page_index=1&page=1'39 def start_requests(self):40 try:41 item_id = getattr(self, 'item_id', None)42 self.limit = getattr(self, 'limit', None)43 item_objects = []44 if item_id:45 item_objects.extend(get_elements(item_id, ItemModel))46 item_cat_id = getattr(self, 'item_cat_id', None)47 if item_cat_id:48 item_objects.extend(get_elements(item_cat_id,49 ItemModel, CatalogModel.id,50 ItemModel.categories))51 item_art = getattr(self, 'item_art', None)52 if item_art:53 item_objects.extend(get_elements(item_art,54 ItemModel, ItemModel.art))55 if item_objects:56 self.skip_variants = True57 for i, el in enumerate(item_objects):58 if i == self.limit:59 return60 yield scrapy.Request(self.convert_seo_to_api_url(el.url),61 self.parse_good,62 cb_kwargs={'iter_variants': False,63 'iter_options': False})64 return65 cat_id = getattr(self, 'cat_id', None) 66 if cat_id:67 objects = get_elements(cat_id, CatalogModel)68 # if cat_id == 'endpoints':69 # objects = Session().query(CatalogModel70 # ).filter_by(end_point=True).all()71 # else:72 # objects = get_elements(cat_id, CatalogModel)73 # objects = get_end_points_by_top_of_bush(cat_id)74 for i, el in enumerate(objects):75 if i == self.limit:76 return77 yield scrapy.Request(self.convert_seo_to_api_url(el.url),78 self.parse_category, cb_kwargs={'category_url': el.url})79 return80 except Exception as e:81 logger.error(traceback.format_exc(ERROR_TRACE_LEVEL))82 raise83 def get_good_data(self, data_text):84 data = json.loads(data_text)85 widget_data = data.get('widgetStates')86 def get_variants(widget_data):87 target_keys = find_keys('webAspects',widget_data)88 target_keys = find_value('aspects', widget_data, target_keys)89 def get_option(var_list):90 if var_list:91 for var in var_list:92 if var.get('active'):93 d_var = DeepDict(var)94 l_textRs = d_var.get('data.textRs')95 if l_textRs:96 return next(l_textRs).get('content')97 def split_variants(aspects):98 variants_ind = 099 options_ind = 1100 for i, el in enumerate(aspects):101 if el.get('type') in ['apparelPics']:102 variants_ind = i103 options_ind = 1 if i == 0 else 0104 return aspects[variants_ind], aspects[options_ind]105 for key in target_keys:106 out = {}107 data_str = widget_data.get(key)108 data = json.loads(data_str)109 aspects = data.get('aspects')110 # textBar, sizes111 # apparelPics112 if len(aspects) == 2:113 variants, options = split_variants(aspects)114 out['variants'] = variants.get('variants')115 out['options'] = options.get('variants')116 out['variant'] = get_option(out['variants'])117 out['option'] = get_option(out['options'])118 else:119 pass120 return out121 def get_characteristics(widget_data):122 target_keys = find_keys('characteristics',widget_data)123 target_keys = find_value('characteristics', widget_data, target_keys)124 out = {}125 for key in target_keys:126 list_ = []127 data_str = widget_data.get(key)128 data = json.loads(data_str)129 char_list = data.get('characteristics', [])130 if len(char_list) == 1:131 list_.extend(char_list[0].get('short'))132 elif len(char_list) > 1:133 list_.extend(char_list[1].get('short'))134 for el in list_:135 key = el.pop('key')136 out[key] = el137 return out138 def get_review(widget_data):139 target_keys = find_keys('reviewProductScore',widget_data)140 for key in target_keys:141 data_str = widget_data.get(key)142 return json.loads(data_str)143 def get_main_info(widget_data):144 target_keys = find_keys('webProductMainWidget', widget_data)145 for key in target_keys:146 data_str = widget_data.get(key)147 data = json.loads(data_str)148 return data.get('cellTrackingInfo',{}).get('product')149 def get_images(widget_data):150 target_keys = find_keys('webGallery', widget_data)151 for key in target_keys:152 data_str = widget_data.get(key)153 return json.loads(data_str).get('images')154 return {155 'main': get_main_info(widget_data),156 'images': get_images(widget_data),157 'review': get_review(widget_data),158 'characteristics': get_characteristics(widget_data),159 'variants': get_variants(widget_data),160 'overwrite': self.overwrite161 }162 def parse_good(self, serponse, parent_item=None,163 iter_options=True, iter_variants=True, category_url=None):164 try:165 data = self.get_good_data(serponse.text)166 data['category_url'] = category_url167 variants = data.get('variants')168 if not parent_item:169 if data:170 parent_item = data.get('main',{}).get('id')171 else:172 data['parent_item'] = parent_item173 if variants:174 if iter_variants and variants.get('variants'):175 for el in variants.get('variants'):176 if not el.get('active'):177 # logger.debug(f'>>>>> try variant {el.get("link")}')178 url = self.convert_category_url_to_api(el.get('link'))179 yield scrapy.Request(url, self.parse_good,180 cb_kwargs={'iter_variants': False,181 'parent_item': parent_item})182 if iter_options and variants.get('options'):183 for el in variants.get('options'):184 if not el.get('active'):185 # logger.debug(f'>>>>> try option {el.get("link")}')186 url = self.convert_category_url_to_api(el.get('link'))187 yield scrapy.Request(url, self.parse_good,188 cb_kwargs={'iter_variants': False,189 'iter_options': False,190 'parent_item': parent_item})191 yield data192 except Exception as e:193 logger.error(traceback.format_exc(ERROR_TRACE_LEVEL))194 def parse_category(self, response, category_url):195 try:196 def find_goods_items(data):197 """We search for following patterns in JSON keys:198 searchResultsV2-226897-default-1199 searchResultsV2-193750-categorySearchMegapagination-2200 """201 for idx, val in data['widgetStates'].items():202 if 'searchResultsV2' in idx:203 return val204 def get_next_page(data):205 url = data.get('pageInfo', {}).get('url')206 if url:207 url = url.split('?')[0]208 shared_str = data.get('shared')209 shared = json.loads(shared_str)210 current_page = shared.get('catalog',{}).get('currentPage',0)211 total_page = shared.get('catalog',{}).get('totalPages',0)212 # current_page = current_page if current_page > 0 else 1213 nex_page = current_page + 1214 if nex_page <= total_page:215 return f'{url}?page={nex_page}'216 category_position = int(response.meta['current_position']) if 'current_position' in response.meta else 1217 category_data = json.loads(response.text)218 items_raw = find_goods_items(category_data)219 items = json.loads(items_raw)220 items_count = len(items['items'])221 current_position = category_position222 for i, item in enumerate(items['items']):223 # logger.debug(f'>>>>> {i} {item["link"]}')224 url = self.convert_category_url_to_api(item['link'])225 yield scrapy.Request(url, self.parse_good, cb_kwargs={'category_url':category_url})226 current_position += 1227 # follow pagination228 next_url = get_next_page(category_data)229 # print('\n\n\n','next_url', next_url, '\n\n\n')230 if next_url:231 next_url = self.convert_category_url_to_api(next_url)232 logger.debug(f'>>> try nextPage {next_url}')233 yield scrapy.Request(234 next_url,235 self.parse_category, cb_kwargs={'category_url': category_url},236 meta={237 # 'category_url': category_url,238 'category_position': category_position + items_count239 }240 )241 except Exception as e:242 logger.error(traceback.format_exc(ERROR_TRACE_LEVEL))...
import.py
Source:import.py
...10from ..services.notifier import UpdateNotifier11logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")12logger = logging.getLogger(__name__)13logger.setLevel(logging.DEBUG)14def iter_variants(filename):15 with gzip.open(filename, 'rt') as ifp:16 for row in DictReader(ifp, dialect='excel-tab'):17 yield row18def did_variant_category_change(old_doc, new_doc):19 old_category = get_variant_category(old_doc)20 new_category = get_variant_category(new_doc)21 return old_category != new_category22def iter_variant_updates(db, variants):23 for variant in variants:24 new_doc = build_variant_doc(DEFAULT_GENOME_BUILD, **variant)25 doc_id = new_doc['_id']26 old_doc = db.variants.find_one({ '_id': doc_id })27 if did_variant_category_change(old_doc, new_doc):28 yield (old_doc, new_doc)29def main(clinvar_filename):30 db = connect_db()31 notifier = UpdateNotifier(db, app.config)32 started_at = datetime.utcnow()33 task_list = []34 variant_iterator = iter_variants(clinvar_filename)35 for i, (old_doc, new_doc) in enumerate(iter_variant_updates(db, variant_iterator)):36 if i % 10000 == 0:37 logger.debug('Processed {} variants'.format(i))38 if old_doc:39 # Variant is already known, either:40 # - someone subscribed before it was added to clinvar, or41 # - it was already in clinvar, and we might have new annotations42 task = update_variant_task(db, old_doc, new_doc)43 else:44 # Add clinvar annotations with empty subscriber data45 task = create_variant_task(db, new_doc)46 task_list.append(task)47 results = run_variant_tasks(db, task_list, notifier=notifier)48 logger.debug('Variants updated. Results: {}'.format(results))...
test_certbot_dns_glesys.py
Source:test_certbot_dns_glesys.py
...26def test_domain_parts_init(full_domain):27 d = DomainParts(full_domain)28 assert d.domain == full_domain29 assert d.subdomain is None30def test_domain_parts_iter_variants():31 d = DomainParts("*.runfalk.se")32 expected_variants = {33 d,34 DomainParts("runfalk.se", "*"),35 DomainParts("se", "*.runfalk"),36 }37 assert set(d.iter_variants()) == expected_variants38def test_domain_parts_iter_variants_complex():39 d = DomainParts("acme-v02.api.letsencrypt.org")40 expected_variants = {41 d,42 DomainParts("api.letsencrypt.org", "acme-v02"),43 DomainParts("letsencrypt.org", "acme-v02.api"),44 DomainParts("org", "acme-v02.api.letsencrypt"),45 }46 assert set(d.iter_variants()) == expected_variants47def test_perform_cleanup_cycle():48 domain = "*.runfalk.se" # Unused49 validation_domain = "_acme-challenge.runfalk.se"50 validation_key = "thisgoesinthetetxtrecord"51 glesys_mock = MagicMock()52 def split_domain(d):53 assert d == validation_domain54 return DomainParts("runfalk.se", "_acme-challenge")55 glesys_mock.split_domain.side_effect = split_domain56 auth = GlesysTestAuthenticator(glesys_mock)57 auth._perform(domain, validation_domain, validation_key)58 glesys_mock.add_record.assert_called_with(59 domain="runfalk.se",60 subdomain="_acme-challenge",...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!