Best Python code snippet using gabbi_python
project.py
Source:project.py
...32 reader = csv.DictReader(f, delimiter=delimiter)33 for r in reader:34 flist[r[key]] = r35 return flist36def _url_replacer(url, params, query_mode=False):37 """Replaces urp params"""38 if query_mode:39 query_char = '?'40 splitter = '&'41 else:42 splitter = PARAM_SPLITTER43 query_char = PARAM_SPLITTER44 parsed = urlparse(url)45 finalparams = []46 for k, v in params.items():47 finalparams.append('%s=%s' % (str(k), str(v)))48 return parsed.geturl() + query_char + splitter.join(finalparams)49def load_json_file(filename, default={}):50 """Loads JSON file and return it as dict"""51 if os.path.exists(filename):52 f = open(filename, 'r', encoding='utf8')53 data = json.load(f)54 f.close()55 else:56 data = default57 return data58class ProjectBuilder:59 """Project builder"""60 def __init__(self, project_path=None):61 self.http = requests.Session()62 self.project_path = os.getcwd() if project_path is None else project_path63 self.config_filename = os.path.join(self.project_path, 'apibackuper.cfg')64 self.__read_config(self.config_filename)65 self.enable_logging()66 def enable_logging(self):67 """Enable logging to file and StdErr"""68 logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")69 rootLogger = logging.getLogger()70 fileHandler = logging.FileHandler("{0}".format(self.logfile))71 fileHandler.setFormatter(logFormatter)72 rootLogger.addHandler(fileHandler)73 consoleHandler = logging.StreamHandler()74 consoleHandler.setFormatter(logFormatter)75 rootLogger.addHandler(consoleHandler)76 def __read_config(self, filename):77 self.config = None78 if os.path.exists(self.config_filename):79 conf = configparser.ConfigParser()80 conf.read(filename, encoding='utf8')81 self.config = conf82 storagedir = conf.get('storage', 'storage_path') if conf.has_option('storage',83 'storage_path') else 'storage'84 self.storagedir = os.path.join(self.project_path, storagedir)85 self.field_splitter = conf.get('settings', 'splitter') if conf.has_option('settings',86 'splitter') else FIELD_SPLITTER87 self.id = conf.get('settings', 'id') if conf.has_option('settings', 'id') else None88 self.name = conf.get('settings', 'name')89 self.logfile = conf.get('settings', 'logfile') if conf.has_option('settings', 'logfile') else "apibackuper.log"90 self.data_key = conf.get('data', 'data_key')91 self.storage_type = conf.get('storage', 'storage_type')92 self.http_mode = conf.get('project', 'http_mode')93 self.description = conf.get('project', 'description') if conf.has_option('project', 'description') else None94 self.start_url = conf.get('project', 'url')95 self.page_limit = conf.getint('params', 'page_size_limit')96 self.resp_type = conf.get('project', 'resp_type') if conf.has_option('project', 'resp_type') else 'json'97 self.iterate_by = conf.get('project', 'iterate_by') if conf.has_option('project', 'iterate_by') else 'page'98 self.default_delay = conf.getint('project', 'default_delay') if conf.has_option('project', 'default_delay') else DEFAULT_DELAY99 self.retry_delay = conf.getint('project', 'retry_delay') if conf.has_option('project',100 'retry_delay') else RETRY_DELAY101 self.force_retry = conf.getboolean('project', 'force_retry') if conf.has_option('project', 'force_retry') else False102 self.retry_count = conf.getint('project', 'retry_count') if conf.has_option('project',103 'retry_count') else DEFAULT_RETRY_COUNT104 self.start_page = conf.getint('params', 'start_page') if conf.has_option('params', 'start_page') else 1105 self.query_mode = conf.get('params', 'query_mode') if conf.has_option('params', 'query_mode') else "query"106 self.flat_params = conf.getboolean('params', 'force_flat_params') if conf.has_option('params', 'force_flat_params') else False107 self.total_number_key = conf.get('data', 'total_number_key') if conf.has_option('data', 'total_number_key') else ''108 self.pages_number_key = conf.get('data', 'pages_number_key') if conf.has_option('data', 'pages_number_key') else ''109 self.page_number_param = conf.get('params', 'page_number_param') if conf.has_option('params', 'page_number_param') else None110 self.count_skip_param = conf.get('params', 'count_skip_param') if conf.has_option('params', 'count_skip_param') else None111 self.count_from_param = conf.get('params', 'count_from_param') if conf.has_option('params', 'count_from_param') else None112 self.count_to_param = conf.get('params', 'count_to_param') if conf.has_option('params', 'count_to_param') else None113 self.page_size_param = conf.get('params', 'page_size_param') if conf.has_option('params', 'page_size_param') else None114 self.storage_file = os.path.join(self.storagedir, 'storage.zip')115 self.details_storage_file = os.path.join(self.storagedir, 'details.zip')116 if conf.has_section('follow'):117 self.follow_data_key = conf.get('follow', 'follow_data_key') if conf.has_option('follow',118 'follow_data_key') else None119 self.follow_item_key = conf.get('follow', 'follow_item_key') if conf.has_option('follow',120 'follow_item_key') else None121 self.follow_mode = conf.get('follow', 'follow_mode') if conf.has_option('follow',122 'follow_mode') else None123 self.follow_http_mode = conf.get('follow', 'follow_http_mode') if conf.has_option('follow',124 'follow_http_mode') else 'GET'125 self.follow_param = conf.get('follow', 'follow_param') if conf.has_option('follow',126 'follow_param') else None127 self.follow_pattern = conf.get('follow', 'follow_pattern') if conf.has_option('follow',128 'follow_pattern') else None129 self.follow_url_key = conf.get('follow', 'follow_url_key') if conf.has_option('follow',130 'follow_url_key') else None131 if conf.has_section('files'):132 self.fetch_mode = conf.get('files', 'fetch_mode')133 self.default_ext = conf.get('files', 'default_ext') if conf.has_option('files', 'default_ext') else None134 self.files_keys = conf.get('files', 'keys').split(',')135 self.root_url = conf.get('files', 'root_url')136 self.storage_mode = conf.get('files', 'storage_mode') if conf.has_option('files', 'storage_mode') else 'filepath'137 self.file_storage_type = conf.get('files', 'file_storage_type') if conf.has_option('files', 'file_storage_type') else 'zip'138 self.use_aria2 = conf.get('files', 'use_aria2') if conf.has_option('files', 'use_aria2') else 'False'139 def _single_request(self, url, headers, params, flatten=None):140 """Single http/https request"""141 if self.http_mode == 'GET':142 if self.flat_params and len(params.keys()) > 0:143 s = []144 for k, v in flatten.items():145 s.append('%s=%s' % (k, v.replace("'", '"').replace('True', 'true')))146 logging.info('url: %s' % (url + '?' + '&'.join(s)))147 if headers:148 response = self.http.get(url + '?' + '&'.join(s), headers=headers, verify=False)149 else:150 response = self.http.get(url + '?' + '&'.join(s), verify=False)151 else:152 logging.info('url: %s, params: %s' % (url, str(params)))153 if headers:154 response = self.http.get(url, params=params, headers=headers, verify=False)155 else:156 response = self.http.get(url, params=params, verify=False)157 else:158 logging.debug('Request %s, params %s, headers %s' % (url, str(params), str(headers)))159 if headers:160 response = self.http.post(url, json=params, headers=headers, verify=False)161 else:162 response = self.http.post(url, json=params, verify=False)163 return response164 @staticmethod165 def create(name):166 """Create new project"""167 if not os.path.exists(name):168 os.mkdir(name)169 config_filename = 'apibackuper.cfg'170 config_path = os.path.join(name, config_filename)171 if os.path.exists(config_path):172 print('Project already exists')173 else:174 config = configparser.ConfigParser()175 config['settings'] = {'initialized': False, 'name': name}176 f = open(config_path, 'w', encoding='utf8')177 config.write(f)178 f.close()179 print('Projects %s created' % (name))180 def init(self, url, pagekey, pagesize, datakey, itemkey, changekey, iterateby, http_mode, work_modes):181 """[TBD] Unfinished method. Don't use it please"""182 conf = self.__read_config(self.config_filename)183 if conf is None:184 print('Config file not found. Please run in project directory')185 return186 pass187 def export(self, format, filename):188 """Exports data as JSON lines, BSON and other formats"""189 if self.config is None:190 print('Config file not found. Please run in project directory')191 return192 if format == 'jsonl':193 outfile = open(filename, 'w', encoding='utf8')194 elif format == 'gzip':195 outfile = gzip.open(filename, mode='wt', encoding='utf8')196 pass197 else:198 print("Only 'jsonl' format supported for now.")199 return200 details_file = os.path.join(self.storagedir, 'details.zip')201 if self.config.has_section('follow') and os.path.exists(details_file):202 mzip = ZipFile(details_file, mode='r', compression=ZIP_DEFLATED)203 for fname in mzip.namelist():204 tf = mzip.open(fname, 'r')205 logging.info('Loading %s' % (fname))206 data = json.load(tf)207 tf.close()208 try:209 if self.follow_data_key:210 follow_data = get_dict_value(data, self.follow_data_key, splitter=self.field_splitter)211 if isinstance(follow_data, dict):212 outfile.write(json.dumps(follow_data, ensure_ascii=False) + '\n')213 else:214 for item in follow_data:215 outfile.write(json.dumps(item, ensure_ascii=False) + '\n')216 else:217 outfile.write(json.dumps(data, ensure_ascii=False) + '\n')218 except KeyError:219 logging.info('Data key: %s not found' % (self.data_key))220 else:221 storage_file = os.path.join(self.storagedir, 'storage.zip')222 if not os.path.exists(storage_file):223 print('Storage file not found %s' % (storage_file))224 return225 mzip = ZipFile(storage_file, mode='r', compression=ZIP_DEFLATED)226 for fname in mzip.namelist():227 tf = mzip.open(fname, 'r')228 try:229 data = json.load(tf)230 except:231 continue232 finally:233 tf.close()234 try:235 if self.data_key:236 for item in get_dict_value(data, self.data_key, splitter=self.field_splitter):237 outfile.write(json.dumps(item, ensure_ascii=False) + '\n')238 else:239 for item in data:240 outfile.write(json.dumps(item, ensure_ascii=False) + '\n')241 except KeyError:242 logging.info('Data key: %s not found' % (self.data_key))243 outfile.close()244 logging.info('Data exported to %s' % (filename))245 def run(self, mode):246 """Run data collection"""247 if self.config is None:248 print('Config file not found. Please run in project directory')249 return250 if not os.path.exists(self.storagedir):251 os.mkdir(self.storagedir)252 if self.storage_type != 'zip':253 print('Only zip storage supported right now')254 return255 storage_file = os.path.join(self.storagedir, 'storage.zip')256 if mode == 'full':257 mzip = ZipFile(storage_file, mode='w', compression=ZIP_DEFLATED)258 else:259 mzip = ZipFile(storage_file, mode='a', compression=ZIP_DEFLATED)260 start = timer()261 headers= load_json_file(os.path.join(self.project_path, 'headers.json'), default={})262 params = load_json_file(os.path.join(self.project_path, 'params.json'), default={})263 if self.flat_params:264 flatten = {}265 for k, v in params.items():266 flatten[k] = str(v)267 else:268 flatten = None269 url_params = load_json_file(os.path.join(self.project_path, 'url_params.json'), default=None)270 if self.query_mode == 'params':271 url = _url_replacer(self.start_url, url_params)272 elif self.query_mode == 'mixed':273 url = _url_replacer(self.start_url, url_params, query_mode=True)274 else:275 url = self.start_url276 response = self._single_request(url, headers, params, flatten)277 if self.resp_type == 'json':278 start_page_data = response.json()279 else:280 start_page_data = xmltodict.parse(response.content)281# print(json.dumps(start_page_data, ensure_ascii=False))282 end = timer()283 if len(self.total_number_key) > 0:284 total = get_dict_value(start_page_data, self.total_number_key, splitter=self.field_splitter)285 nr = 1 if total % self.page_limit > 0 else 0286 num_pages = (total / self.page_limit) + nr287 elif len(self.pages_number_key) > 0:288 num_pages = int(get_dict_value(start_page_data, self.pages_number_key, splitter=self.field_splitter))289 total = num_pages * self.page_limit290 else:291 num_pages = None292 total = None293 logging.info('Total pages %d, records %d' % (num_pages, total))294 num_pages = int(num_pages)295 change_params = {}296 # By default it's "full" mode with start_page and end_page as full list of pages297 start_page = self.start_page298 end_page = num_pages + self.start_page299 # if "continue" mode is set, shift start_page to last page saved. Force rewriting last page and continue300 if mode == 'continue':301 logging.debug('Continue mode enabled, looking for last saved page')302 pagenames = mzip.namelist()303 for page in range(self.start_page, num_pages):304 if 'page_%d.json' % (page) not in pagenames:305 if page > self.start_page:306 start_page = page307 else:308 start_page = page309 break310 logging.debug('Start page number %d' % (start_page))311 for page in range(start_page, end_page):312 if self.page_size_param and len(self.page_size_param) > 0:313 change_params[self.page_size_param] = self.page_limit314 if self.iterate_by == 'page':315 change_params[self.page_number_param] = page316 elif self.iterate_by == 'skip':317 change_params[self.count_skip_param] = (page-1) * self.page_limit318 elif self.iterate_by == 'range':319 change_params[self.count_from_param] = (page-1) * self.page_limit320 change_params[self.count_to_param] = page * self.page_limit321 url = self.start_url if self.query_mode != 'params' else _url_replacer(self.start_url, url_params)322 if self.query_mode in ['params', 'mixed']:323 url_params.update(change_params)324 else:325# print(params, change_params)326 params = update_dict_values(params, change_params)327 if self.flat_params and len(params.keys()) > 0:328 for k, v in params.items():329 flatten[k] = str(v)330 if self.query_mode == 'params':331 url = _url_replacer(self.start_url, url_params)332 elif self.query_mode == 'mixed':333 url = _url_replacer(self.start_url, url_params, query_mode=True)334 else:335 url = self.start_url336 response = self._single_request(url, headers, params, flatten)337 time.sleep(self.default_delay)338 if response.status_code in DEFAULT_ERROR_STATUS_CODES:339 rc = 0340 for rc in range(1, self.retry_count, 1):341 logging.info('Retry attempt %d of %d, delay %d' % (rc, self.retry_count, self.retry_delay))342 time.sleep(self.retry_delay)343 response = self._single_request(url, headers, params, flatten)344 if response.status_code not in DEFAULT_ERROR_STATUS_CODES:345 logging.info('Looks like finally we have proper response on %d attempt' %(rc))346 break347 if response.status_code not in DEFAULT_ERROR_STATUS_CODES:348 if num_pages is not None:349 logging.info('Saving page %d of %d' % (page, num_pages))350 else:351 logging.info('Saving page %d' % (page))352 if self.resp_type == 'json':353 outdata = response.content354 elif self.resp_type == 'xml':355 outdata = json.dump(xmltodict.parse(response.content))356 mzip.writestr('page_%d.json' % (page), outdata)357 else:358 logging.info('Errors persist on page %d. Stopped' % (page))359 break360 mzip.close()361 # pass362 def follow(self, mode='item'):363 """Collects data about each data using additional requests"""364 if self.config is None:365 print('Config file not found. Please run in project directory')366 return367 if not os.path.exists(self.storagedir):368 os.mkdir(self.storagedir)369 if self.storage_type != 'zip':370 print('Only zip storage supported right now')371 return372 if not os.path.exists(self.storage_file):373 print('Storage file not found')374 return375 params = None376 params_file = os.path.join(self.project_path, 'follow_params.json')377 if os.path.exists(params_file):378 f = open(params_file, 'r', encoding='utf8')379 params = json.load(f)380 f.close()381 else:382 params = {}383 if self.flat_params:384 flatten = {}385 for k, v in params.items():386 flatten[k] = str(v)387 headers_file = os.path.join(self.project_path, 'headers.json')388 if os.path.exists(headers_file):389 f = open(headers_file, 'r', encoding='utf8')390 headers = json.load(f)391 f.close()392 else:393 headers = {}394 mzip = ZipFile(self.storage_file, mode='r', compression=ZIP_DEFLATED)395 if self.follow_mode == 'item':396 allkeys = []397 logging.info('Extract unique key values from downloaded data')398 for fname in mzip.namelist():399 tf = mzip.open(fname, 'r')400 data = json.load(tf)401 tf.close()402 try:403 for item in get_dict_value(data, self.data_key, splitter=self.field_splitter):404 allkeys.append(item[self.follow_item_key])405# except KeyError:406 except KeyboardInterrupt:407 logging.info('Data key: %s not found' % (self.data_key))408 logging.info('%d allkeys to process' % (len(allkeys)))409 if mode == 'full':410 mzip = ZipFile(self.details_storage_file, mode='w', compression=ZIP_DEFLATED)411 finallist = allkeys412 elif mode == 'continue':413 mzip = ZipFile(self.details_storage_file, mode='a', compression=ZIP_DEFLATED)414 keys = []415 filenames = mzip.namelist()416 for name in filenames:417 keys.append(int(name.rsplit('.', 1)[0]))418 logging.info('%d filenames in zip file' % (len(keys)))419 finallist = list(set(allkeys) - set(keys))420 logging.info('%d keys in final list' % (len(finallist)))421 n = 0422 total = len(finallist)423 for key in finallist:424 n += 1425 change_params = {}426 change_params[self.follow_param] = key427 params = update_dict_values(params, change_params)428 if self.follow_http_mode == 'GET':429 if headers:430 response = self.http.get(self.follow_pattern, params=params, headers=headers)431 else:432 response = self.http.get(self.follow_pattern, params=params)433 else:434 if headers:435 response = self.http.post(self.follow_pattern, params=params, headers=headers)436 else:437 response = self.http.post(self.follow_pattern, params=params)438 logging.info('Saving object with id %s. %d of %d' % (key, n, total))439 mzip.writestr('%s.json' % (key), response.content)440 time.sleep(DEFAULT_DELAY)441 mzip.close()442 elif self.follow_mode == 'url':443 allkeys = {}444 logging.info('Extract urls to follow from downloaded data')445 for fname in mzip.namelist():446 tf = mzip.open(fname, 'r')447 data = json.load(tf)448 tf.close()449# logging.info(str(data))450 try:451 for item in get_dict_value(data, self.data_key, splitter=self.field_splitter):452 id = item[self.follow_item_key]453 allkeys[id] = get_dict_value(item, self.follow_url_key, splitter=self.field_splitter)454 except KeyError:455 logging.info('Data key: %s not found' % (self.data_key))456 if mode == 'full':457 mzip = ZipFile(self.details_storage_file, mode='w', compression=ZIP_DEFLATED)458 finallist = allkeys459 n = 0460 elif mode == 'continue':461 mzip = ZipFile(self.details_storage_file, mode='a', compression=ZIP_DEFLATED)462 keys = []463 filenames = mzip.namelist()464 for name in filenames:465 keys.append(int(name.rsplit('.', 1)[0]))466 finallist = list(set(allkeys.keys()) - set(keys))467 n = len(keys)468 total = len(allkeys.keys())469 for key in finallist:470 n += 1471 url = allkeys[key]472 if headers:473 response = self.http.get(url, params=params, headers=headers)474 else:475 response = self.http.get(url, params=params)476 # else:477 # if http_mode == 'GET':478 # response = self.http.post(start_url, json=params)479 logging.info('Saving object with id %s. %d of %d' % (key, n, total))480 mzip.writestr('%s.json' % (key), response.content)481 time.sleep(DEFAULT_DELAY)482 mzip.close()483 elif self.follow_mode == 'drilldown':484 pass485 elif self.follow_mode == 'prefix':486 allkeys = []487 logging.info('Extract unique key values from downloaded data')488 for fname in mzip.namelist():489 tf = mzip.open(fname, 'r')490 data = json.load(tf)491 tf.close()492 try:493 for item in get_dict_value(data, self.data_key, splitter=self.field_splitter):494 allkeys.append(item[self.follow_item_key])495 except KeyboardInterrupt:496 logging.info('Data key: %s not found' % (self.data_key))497 if mode == 'full':498 mzip = ZipFile(self.details_storage_file, mode='w', compression=ZIP_DEFLATED)499 finallist = allkeys500 elif mode == 'continue':501 mzip = ZipFile(self.details_storage_file, mode='a', compression=ZIP_DEFLATED)502 keys = []503 filenames = mzip.namelist()504 for name in filenames:505 keys.append(name.rsplit('.', 1)[0])506 finallist = list(set(allkeys) - set(keys))507 n = 0508 total = len(finallist)509 for key in finallist:510 n += 1511 url = self.follow_pattern + str(key)512# print(url)513 response = self.http.get(url)514 logging.info('Saving object with id %s. %d of %d' % (key, n, total))515 mzip.writestr('%s.json' % (key), response.content)516 time.sleep(DEFAULT_DELAY)517 mzip.close()518 else:519 print('Follow section not configured. Please update config file')520 def getfiles(self, be_careful=False):521 """Downloads all files associated with this API data"""522 if self.config is None:523 print('Config file not found. Please run in project directory')524 return525 if not os.path.exists(self.storagedir):526 os.mkdir(self.storagedir)527 if self.storage_type != 'zip':528 print('Only zip storage supported right now')529 return530 storage_file = os.path.join(self.storagedir, 'storage.zip')531 if not os.path.exists(storage_file):532 print('Storage file not found')533 return534 uniq_ids = set()535 allfiles_name = os.path.join(self.storagedir, 'allfiles.csv')536 if not os.path.exists(allfiles_name):537 if not self.config.has_section('follow'):538 logging.info('Extract file urls from downloaded data')539 mzip = ZipFile(storage_file, mode='r', compression=ZIP_DEFLATED)540 n = 0541 for fname in mzip.namelist():542 n += 1543 if n % 10 == 0:544 logging.info('Processed %d files, uniq ids %d' % (n, len(uniq_ids)))545 tf = mzip.open(fname, 'r')546 data = json.load(tf)547 tf.close()548 try:549 if self.data_key:550 iterate_data = get_dict_value(data, self.data_key, splitter=self.field_splitter)551 else:552 iterate_data = data553 for item in iterate_data:554 if item:555 for key in self.files_keys:556 file_data = get_dict_value(item, key, as_array=True, splitter=self.field_splitter)557 if file_data:558 for uniq_id in file_data:559 if uniq_id is not None:560 if isinstance(uniq_id, list):561 uniq_ids.update(set(uniq_id))562 else:563 uniq_ids.add(uniq_id)564 except KeyError:565 logging.info('Data key: %s not found' % (str(self.data_key)))566 else:567 details_storage_file = os.path.join(self.storagedir, 'details.zip')568 mzip = ZipFile(details_storage_file, mode='r', compression=ZIP_DEFLATED)569 n = 0570 for fname in mzip.namelist():571 n += 1572 if n % 1000 == 0:573 logging.info('Processed %d records' % (n))574 tf = mzip.open(fname, 'r')575 data = json.load(tf)576 tf.close()577 items = []578 if self.follow_data_key:579 for item in get_dict_value(data, self.follow_data_key, splitter=self.field_splitter):580 items.append(item)581 else:582 items = [data, ]583 for item in items:584 for key in self.files_keys:585 urls = get_dict_value(item, key, as_array=True, splitter=self.field_splitter)586 if urls is not None:587 for uniq_id in urls:588 if uniq_id is not None and len(uniq_id.strip()) > 0:589 uniq_ids.append(uniq_id)590 mzip.close()591 logging.info('Storing all filenames')592 f = open(allfiles_name, 'w', encoding='utf8')593 for u in uniq_ids:594 f.write(str(u) + '\n')595 f.close()596 else:597 logging.info('Load all filenames')598 uniq_ids = load_file_list(allfiles_name)599 # Start download600 processed_files = []601 skipped_files_dict = {}602 files_storage_file = os.path.join(self.storagedir, 'files.zip')603 files_list_storage = os.path.join(self.storagedir, 'files.list')604 files_skipped = os.path.join(self.storagedir, 'files_skipped.list')605 if os.path.exists(files_list_storage):606 processed_files = load_file_list(files_list_storage, encoding='utf8')607 list_file = open(files_list_storage, 'a', encoding='utf8')608 else:609 list_file = open(files_list_storage, 'w', encoding='utf8')610 if os.path.exists(files_skipped):611 skipped_files_dict = load_csv_data(files_skipped, key='filename', encoding='utf8')612 skipped_file = open(files_skipped, 'a', encoding='utf8')613 skipped = csv.DictWriter(skipped_file, delimiter=';', fieldnames=['filename', 'filesize', 'reason'])614 else:615 skipped_files_dict = {}616 skipped_file = open(files_skipped, 'w', encoding='utf8')617 skipped = csv.DictWriter(skipped_file, delimiter=';', fieldnames=['filename', 'filesize', 'reason'])618 skipped.writeheader()619 use_aria2 = True if self.use_aria2 == 'True' else False620 if use_aria2:621 aria2 = aria2p.API(622 aria2p.Client(623 host="http://localhost",624 port=6800,625 secret=""626 )627 )628 else:629 aria2 = None630 if self.file_storage_type == 'zip':631 fstorage = ZipFileStorage(files_storage_file, mode='a', compression=ZIP_DEFLATED)632 elif self.file_storage_type == 'filesystem':633 fstorage = FilesystemStorage(os.path.join('storage', 'files'))634 n = 0635 for uniq_id in uniq_ids:636 if self.fetch_mode == 'prefix':637 url = self.root_url + str(uniq_id)638 elif self.fetch_mode == 'pattern':639 url = self.root_url.format(uniq_id)640 n += 1641 if n % 50 == 0:642 logging.info('Downloaded %d files' % (n))643# if url in processed_files:644# continue645 if be_careful:646 r = self.http.head(url, timeout=DEFAULT_TIMEOUT, verify=False)647 if 'content-disposition' in r.headers.keys() and self.storage_mode == 'filepath':648 filename = r.headers['content-disposition'].rsplit('filename=', 1)[-1].strip('"')649 elif self.default_ext is not None:650 filename = uniq_id + '.' + self.default_ext651 else:652 filename = uniq_id653# if not 'content-length' in r.headers.keys():654# logging.info('File %s skipped since content-length not found in headers' % (url))655# record = {'filename' : filename, 'filesize' : "0", 'reason' : 'Content-length not set in headers'}656# skipped_files_dict[uniq_id] = record657# skipped.writerow(record)658# continue659 if 'content-length' in r.headers.keys() and int(r.headers['content-length']) > FILE_SIZE_DOWNLOAD_LIMIT and self.file_storage_type == 'zip':660 logging.info('File skipped with size %d and name %s' % (int(r.headers['content-length']) , url))661 record = {'filename' : filename, 'filesize' : str(r.headers['content-length']), 'reason' : 'File too large. More than %d bytes' % (FILE_SIZE_DOWNLOAD_LIMIT)}662 skipped_files_dict[uniq_id] = record663 skipped.writerow(record)664 continue665 else:666 if self.default_ext is not None:667 filename = str(uniq_id) + '.' + self.default_ext668 else:669 filename = str(uniq_id)670 if self.storage_mode == 'filepath':671 filename = urlparse(url).path672 logging.info('Processing %s as %s' % (url, filename))673 if fstorage.exists(filename):674 logging.info('File %s already stored' % (filename))675 continue676 if not use_aria2:677 response = self.http.get(url, timeout=DEFAULT_TIMEOUT, verify=False)678 fstorage.store(filename, response.content)679 list_file.write(url + '\n')680 else:681 aria2.add_uris(uris=[url,], options={'out': filename, 'dir' : os.path.abspath(os.path.join('storage', 'files'))})682 fstorage.close()683 list_file.close()684 skipped_file.close()685 def estimate(self, mode):686 """Measures time, size and count of records"""687 if self.config is None:688 print('Config file not found. Please run in project directory')689 return690 data = []691 params = {}692 data_size = 0693 headers = None694 headers_file = os.path.join(self.project_path, 'headers.json')695 if os.path.exists(headers_file):696 f = open(headers_file, 'r', encoding='utf8')697 headers = json.load(f)698 f.close()699 else:700 headers = {}701 params_file = os.path.join(self.project_path, 'params.json')702 if os.path.exists(params_file):703 f = open(params_file, 'r', encoding='utf8')704 params = json.load(f)705 f.close()706 if self.flat_params:707 flatten = {}708 for k, v in params.items():709 flatten[k] = str(v)710 params = flatten711 url_params = None712 params_file = os.path.join(self.project_path, 'url_params.json')713 if os.path.exists(params_file):714 f = open(params_file, 'r', encoding='utf8')715 url_params = json.load(f)716 f.close()717 if len(self.total_number_key) > 0:718 start = timer()719 if self.query_mode == 'params':720 url = _url_replacer(self.start_url, url_params)721 elif self.query_mode == 'mixed':722 url = _url_replacer(self.start_url, url_params, query_mode=True)723 else:724 url = self.start_url725 if self.http_mode == 'GET':726 if self.flat_params and len(params.keys()) > 0:727 s = []728 for k, v in params.items():729 s.append('%s=%s' % (k, v.replace("'", '"').replace('True', 'true')))730 if headers:731 start_page_data = self.http.get(url + '?' + '&'.join(s), headers=headers).json()732 else:733 start_page_data = self.http.get(url + '?' + '&'.join(s)).json()734 else:735 logging.debug('Start request params: %s headers: %s' %(str(params), str(headers)))736 if headers and len(headers.keys()) > 0:...
case.py
Source:case.py
...290 return re.sub(self._simple_replacer_regex('URL'),291 self._regex_replacer(self._url_replacer,292 escape_regex),293 message)294 def _url_replacer(self, match):295 """Replace a regex match with the value of a previous url."""296 case = match.group('case')297 if case:298 referred_case = self.history[case]299 else:300 referred_case = self.prior301 return referred_case.url302 def _location_replace(self, message, escape_regex=False):303 """Replace $LOCATION in a message.304 With the location header from a previous request.305 """306 return re.sub(self._simple_replacer_regex('LOCATION'),307 self._regex_replacer(self._location_replacer,308 escape_regex),...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!