Best Python code snippet using autotest_python
topic_common.py
Source:topic_common.py
...114 'invalid': lambda flag: str(bool(not flag)),115 'only_if_needed': _int_2_bool_string,116 'platform': __convert_platform,117 'labels': lambda labels: ', '.join(labels)}118def _get_item_key(item, key):119 """Allow for lookups in nested dictionaries using '.'s within a key."""120 if key in item:121 return item[key]122 nested_item = item123 for subkey in key.split('.'):124 if not subkey:125 raise ValueError('empty subkey in %r' % key)126 try:127 nested_item = nested_item[subkey]128 except KeyError, e:129 raise KeyError('%r - looking up key %r in %r' %130 (e, key, nested_item))131 else:132 return nested_item133class CliError(Exception):134 pass135class item_parse_info(object):136 def __init__(self, attribute_name, inline_option='',137 filename_option='', use_leftover=False):138 """Object keeping track of the parsing options that will139 make up the content of the atest attribute:140 atttribute_name: the atest attribute name to populate (label)141 inline_option: the option containing the items (--label)142 filename_option: the option containing the filename (--blist)143 use_leftover: whether to add the leftover arguments or not."""144 self.attribute_name = attribute_name145 self.filename_option = filename_option146 self.inline_option = inline_option147 self.use_leftover = use_leftover148 def get_values(self, options, leftover=[]):149 """Returns the value for that attribute by accumualting all150 the values found through the inline option, the parsing of the151 file and the leftover"""152 def __get_items(input, split_spaces=True):153 """Splits a string of comma separated items. Escaped commas will not154 be split. I.e. Splitting 'a, b\,c, d' will yield ['a', 'b,c', 'd'].155 If split_spaces is set to False spaces will not be split. I.e.156 Splitting 'a b, c\,d, e' will yield ['a b', 'c,d', 'e']"""157 # Replace escaped slashes with null characters so we don't misparse158 # proceeding commas.159 input = input.replace(r'\\', '\0')160 # Split on commas which are not preceded by a slash.161 if not split_spaces:162 split = re.split(r'(?<!\\),', input)163 else:164 split = re.split(r'(?<!\\),|\s', input)165 # Convert null characters to single slashes and escaped commas to166 # just plain commas.167 return (item.strip().replace('\0', '\\').replace(r'\,', ',') for168 item in split if item.strip())169 if self.use_leftover:170 add_on = leftover171 leftover = []172 else:173 add_on = []174 # Start with the add_on175 result = set()176 for items in add_on:177 # Don't split on space here because the add-on178 # may have some spaces (like the job name)179 result.update(__get_items(items, split_spaces=False))180 # Process the inline_option, if any181 try:182 items = getattr(options, self.inline_option)183 result.update(__get_items(items))184 except (AttributeError, TypeError):185 pass186 # Process the file list, if any and not empty187 # The file can contain space and/or comma separated items188 try:189 flist = getattr(options, self.filename_option)190 file_content = []191 for line in open(flist).readlines():192 file_content += __get_items(line)193 if len(file_content) == 0:194 raise CliError("Empty file %s" % flist)195 result.update(file_content)196 except (AttributeError, TypeError):197 pass198 except IOError:199 raise CliError("Could not open file %s" % flist)200 return list(result), leftover201class atest(object):202 """Common class for generic processing203 Should only be instantiated by itself for usage204 references, otherwise, the <topic> objects should205 be used."""206 msg_topic = "[acl|host|job|label|atomicgroup|test|user]"207 usage_action = "[action]"208 msg_items = ''209 def invalid_arg(self, header, follow_up=''):210 twrap = textwrap.TextWrapper(initial_indent=' ',211 subsequent_indent=' ')212 rest = twrap.fill(follow_up)213 if self.kill_on_failure:214 self.invalid_syntax(header + rest)215 else:216 # don't print the message here.217 # It will be displayed later by "show_all_failures"218 # header ends with ":\n"219 # so, strip it off220 msg = header.rstrip().rstrip(":")221 self.failure(rest, item="", what_failed=msg)222 def invalid_syntax(self, msg):223 print224 print >> sys.stderr, msg225 print226 print "usage:",227 print self._get_usage()228 print229 sys.exit(1)230 def generic_error(self, msg):231 if self.debug:232 traceback.print_exc()233 print >> sys.stderr, msg234 sys.exit(1)235 def parse_json_exception(self, full_error):236 """Parses the JSON exception to extract the bad237 items and returns them238 This is very kludgy for the moment, but we would need239 to refactor the exceptions sent from the front end240 to make this better"""241 errmsg = str(full_error).split('Traceback')[0].rstrip('\n')242 parts = errmsg.split(':')243 # Kludge: If there are 2 colons the last parts contains244 # the items that failed.245 if len(parts) != 3:246 return []247 return [item.strip() for item in parts[2].split(',') if item.strip()]248 def failure(self, full_error, item=None, what_failed='', fatal=False):249 """If kill_on_failure, print this error and die,250 otherwise, queue the error and accumulate all the items251 that triggered the same error."""252 if self.debug:253 errmsg = str(full_error)254 else:255 errmsg = str(full_error).split('Traceback')[0].rstrip('\n')256 if self.kill_on_failure or fatal:257 print >> sys.stderr, "%s\n %s" % (what_failed, errmsg)258 sys.exit(1)259 # Build a dictionary with the 'what_failed' as keys. The260 # values are dictionaries with the errmsg as keys and a set261 # of items as values.262 # self.failed =263 # {'Operation delete_host_failed': {'AclAccessViolation:264 # set('host0', 'host1')}}265 # Try to gather all the same error messages together,266 # even if they contain the 'item'267 if item and item in errmsg:268 errmsg = errmsg.replace(item, FAIL_TAG)269 if self.failed.has_key(what_failed):270 self.failed[what_failed].setdefault(errmsg, set()).add(item)271 else:272 self.failed[what_failed] = {errmsg: set([item])}273 def show_all_failures(self):274 if not self.failed:275 return 0276 for what_failed in self.failed.keys():277 print >> sys.stderr, what_failed + ':'278 for (errmsg, items) in self.failed[what_failed].iteritems():279 if len(items) == 0:280 print >> sys.stderr, errmsg281 elif items == set(['']):282 print >> sys.stderr, ' ' + errmsg283 elif len(items) == 1:284 # Restore the only item285 if FAIL_TAG in errmsg:286 errmsg = errmsg.replace(FAIL_TAG, items.pop())287 else:288 errmsg = '%s (%s)' % (errmsg, items.pop())289 print >> sys.stderr, ' ' + errmsg290 else:291 print >> sys.stderr, ' ' + errmsg + ' with <XYZ> in:'292 twrap = textwrap.TextWrapper(initial_indent=' ',293 subsequent_indent=' ')294 items = list(items)295 items.sort()296 print >> sys.stderr, twrap.fill(', '.join(items))297 return 1298 def __init__(self):299 """Setup the parser common options"""300 # Initialized for unit tests.301 self.afe = None302 self.failed = {}303 self.data = {}304 self.debug = False305 self.parse_delim = '|'306 self.kill_on_failure = False307 self.web_server = ''308 self.verbose = False309 self.username = ''310 self.topic_parse_info = item_parse_info(attribute_name='not_used')311 self.parser = optparse.OptionParser(self._get_usage())312 self.parser.add_option('-g', '--debug',313 help='Print debugging information',314 action='store_true', default=False)315 self.parser.add_option('--kill-on-failure',316 help='Stop at the first failure',317 action='store_true', default=False)318 self.parser.add_option('--parse',319 help='Print the output using | '320 'separated key=value fields',321 action='store_true', default=False)322 self.parser.add_option('--parse-delim',323 help='Delimiter to use to separate the '324 'key=value fields', default='|')325 self.parser.add_option('-v', '--verbose',326 action='store_true', default=False)327 self.parser.add_option('-w', '--web',328 help='Specify the autotest server '329 'to talk to',330 action='store', type='string',331 dest='web_server', default=None)332 self.parser.add_option('-Q', '--username',333 help='Specify the username to'334 'login with',335 action='store', type='string',336 dest='username', default=None)337 def _get_usage(self):338 return ("%s %s %s [options] %s" % (os.path.basename(sys.argv[0]),339 self.msg_topic.lower(),340 self.usage_action,341 self.msg_items))342 def backward_compatibility(self, action, argv):343 """To be overidden by subclass if their syntax changed"""344 return action345 def parse(self, parse_info=[], req_items=None):346 """parse_info is a list of item_parse_info objects347 There should only be one use_leftover set to True in the list.348 Also check that the req_items is not empty after parsing."""349 (options, leftover) = self.parse_global()350 all_parse_info = parse_info[:]351 all_parse_info.append(self.topic_parse_info)352 try:353 for item_parse_info in all_parse_info:354 values, leftover = item_parse_info.get_values(options,355 leftover)356 setattr(self, item_parse_info.attribute_name, values)357 except CliError, s:358 self.invalid_syntax(s)359 if (req_items and not getattr(self, req_items, None)):360 self.invalid_syntax('%s %s requires at least one %s' %361 (self.msg_topic,362 self.usage_action,363 self.msg_topic))364 return (options, leftover)365 def parse_global(self):366 """Parse the global arguments.367 It consumes what the common object needs to know, and368 let the children look at all the options. We could369 remove the options that we have used, but there is no370 harm in leaving them, and the children may need them371 in the future.372 Must be called from its children parse()"""373 (options, leftover) = self.parser.parse_args()374 # Handle our own options setup in __init__()375 self.debug = options.debug376 self.kill_on_failure = options.kill_on_failure377 if options.parse:378 suffix = '_parse'379 else:380 suffix = '_std'381 for func in ['print_fields', 'print_table',382 'print_by_ids', 'print_list']:383 setattr(self, func, getattr(self, func + suffix))384 self.parse_delim = options.parse_delim385 self.verbose = options.verbose386 self.web_server = options.web_server387 self.username = options.username388 try:389 self.afe = rpc.afe_comm(self.web_server, username=self.username)390 except rpc.AuthError, s:391 self.failure(str(s), fatal=True)392 return (options, leftover)393 def check_and_create_items(self, op_get, op_create,394 items, **data_create):395 """Create the items if they don't exist already"""396 for item in items:397 ret = self.execute_rpc(op_get, name=item)398 if len(ret) == 0:399 try:400 data_create['name'] = item401 self.execute_rpc(op_create, **data_create)402 except CliError:403 continue404 def execute_rpc(self, op, item='', **data):405 retry = 2406 while retry:407 try:408 return self.afe.run(op, **data)409 except urllib2.URLError, err:410 if hasattr(err, 'reason'):411 if 'timed out' not in err.reason:412 self.invalid_syntax('Invalid server name %s: %s' %413 (self.afe.web_server, err))414 if hasattr(err, 'code'):415 error_parts = [str(err)]416 if self.debug:417 error_parts.append(err.read()) # read the response body418 self.failure('\n\n'.join(error_parts), item=item,419 what_failed=("Error received from web server"))420 raise CliError("Error from web server")421 if self.debug:422 print 'retrying: %r %d' % (data, retry)423 retry -= 1424 if retry == 0:425 if item:426 myerr = '%s timed out for %s' % (op, item)427 else:428 myerr = '%s timed out' % op429 self.failure(myerr, item=item,430 what_failed=("Timed-out contacting "431 "the Autotest server"))432 raise CliError("Timed-out contacting the Autotest server")433 except mock.CheckPlaybackError:434 raise435 except Exception, full_error:436 # There are various exceptions throwns by JSON,437 # urllib & httplib, so catch them all.438 self.failure(full_error, item=item,439 what_failed='Operation %s failed' % op)440 raise CliError(str(full_error))441 # There is no output() method in the atest object (yet?)442 # but here are some helper functions to be used by its443 # children444 def print_wrapped(self, msg, values):445 if len(values) == 0:446 return447 elif len(values) == 1:448 print msg + ': '449 elif len(values) > 1:450 if msg.endswith('s'):451 print msg + ': '452 else:453 print msg + 's: '454 values.sort()455 if 'AUTOTEST_CLI_NO_WRAP' in os.environ:456 print '\n'.join(values)457 return458 twrap = textwrap.TextWrapper(initial_indent='\t',459 subsequent_indent='\t')460 print twrap.fill(', '.join(values))461 def __conv_value(self, type, value):462 return KEYS_CONVERT.get(type, str)(value)463 def print_fields_std(self, items, keys, title=None):464 """Print the keys in each item, one on each line"""465 if not items:466 return467 if title:468 print title469 for item in items:470 for key in keys:471 print '%s: %s' % (KEYS_TO_NAMES_EN[key],472 self.__conv_value(key,473 _get_item_key(item, key)))474 def print_fields_parse(self, items, keys, title=None):475 """Print the keys in each item as comma476 separated name=value"""477 for item in items:478 values = ['%s=%s' % (KEYS_TO_NAMES_EN[key],479 self.__conv_value(key,480 _get_item_key(item, key)))481 for key in keys482 if self.__conv_value(key,483 _get_item_key(item, key)) != '']484 print self.parse_delim.join(values)485 def __find_justified_fmt(self, items, keys):486 """Find the max length for each field."""487 lens = {}488 # Don't justify the last field, otherwise we have blank489 # lines when the max is overlaps but the current values490 # are smaller491 if not items:492 print "No results"493 return494 for key in keys[:-1]:495 lens[key] = max(len(self.__conv_value(key,496 _get_item_key(item, key)))497 for item in items)498 lens[key] = max(lens[key], len(KEYS_TO_NAMES_EN[key]))499 lens[keys[-1]] = 0500 return ' '.join(["%%-%ds" % lens[key] for key in keys])501 def print_table_std(self, items, keys_header, sublist_keys=()):502 """Print a mix of header and lists in a user readable503 format504 The headers are justified, the sublist_keys are wrapped."""505 if not items:506 return507 fmt = self.__find_justified_fmt(items, keys_header)508 header = tuple(KEYS_TO_NAMES_EN[key] for key in keys_header)509 print fmt % header510 for item in items:511 values = tuple(self.__conv_value(key,512 _get_item_key(item, key))513 for key in keys_header)514 print fmt % values515 if sublist_keys:516 for key in sublist_keys:517 self.print_wrapped(KEYS_TO_NAMES_EN[key],518 _get_item_key(item, key))519 print '\n'520 def print_table_parse(self, items, keys_header, sublist_keys=()):521 """Print a mix of header and lists in a user readable522 format"""523 for item in items:524 values = ['%s=%s' % (KEYS_TO_NAMES_EN[key],525 self.__conv_value(key, _get_item_key(item, key)))526 for key in keys_header527 if self.__conv_value(key,528 _get_item_key(item, key)) != '']529 if sublist_keys:530 [values.append('%s=%s' % (KEYS_TO_NAMES_EN[key],531 ','.join(_get_item_key(item, key))))532 for key in sublist_keys533 if len(_get_item_key(item, key))]534 print self.parse_delim.join(values)535 def print_by_ids_std(self, items, title=None, line_before=False):536 """Prints ID & names of items in a user readable form"""537 if not items:538 return539 if line_before:540 print541 if title:542 print title + ':'543 self.print_table_std(items, keys_header=['id', 'name'])544 def print_by_ids_parse(self, items, title=None, line_before=False):545 """Prints ID & names of items in a parseable format"""546 if not items:547 return548 if title:549 print title + '=',550 values = []551 for item in items:552 values += ['%s=%s' % (KEYS_TO_NAMES_EN[key],553 self.__conv_value(key,554 _get_item_key(item, key)))555 for key in ['id', 'name']556 if self.__conv_value(key,557 _get_item_key(item, key)) != '']558 print self.parse_delim.join(values)559 def print_list_std(self, items, key):560 """Print a wrapped list of results"""561 if not items:562 return563 print ' '.join(_get_item_key(item, key) for item in items)564 def print_list_parse(self, items, key):565 """Print a wrapped list of results"""566 if not items:567 return568 print '%s=%s' % (KEYS_TO_NAMES_EN[key],...
p3_plugin_manager.py
Source:p3_plugin_manager.py
...14from yapsy.PluginFileLocator import PluginFileLocator, PluginFileAnalyzerMathingRegex15from yapsy.IPlugin import IPlugin16from . import p3_test_driver17_plugin_toc = {}18def _get_item_key(class_type, class_name):19 return '%s:%s' % (class_type, class_name)20def _get_plugin_dirs():21 logging.info(p3_test_driver.__file__)22 maindir = os.path.dirname(os.path.realpath(p3_test_driver.__file__))23 return [24 os.path.join(maindir, 'plugins'),25 os.path.join(os.getcwd(), 'p3_test_driver_plugins'),26 ]27def scan_plugins():28 global _plugin_toc29 _plugin_toc = {}30 # Load the plugins from the plugin directory.31 analyzer = PluginFileAnalyzerMathingRegex('', '.*\\.py$')32 plugin_locator = PluginFileLocator(analyzers=[analyzer])33 manager = PluginManager(plugin_locator=plugin_locator)34 plugin_dirs = _get_plugin_dirs()35 logging.info('Loading plugins in %s' % str(plugin_dirs))36 manager.setPluginPlaces(plugin_dirs)37 manager.collectPlugins()38 # Loop round the plugins and print their names.39 for plugin in manager.getAllPlugins():40 plugin_info = plugin.plugin_object.get_plugin_info()41 for item in plugin_info:42 k = _get_item_key(item['class_type'], item['class_name'])43 _plugin_toc[k] = item44 logging.debug('p3_plugin_manager: _plugin_toc=%s' % _plugin_toc)45def get_class_property(class_type, class_name, property_name, default=None):46 global _plugin_toc47 k = _get_item_key(class_type, class_name)48 print('k=%s, property_name=%s' % (k, property_name))49 value = _plugin_toc[k].get(property_name, default)50 print('value=%s' % value)51 return value52def get_class(class_type, class_name):53 global _plugin_toc54 k = _get_item_key(class_type, class_name)55 return _plugin_toc[k]['class']56class IP3Plugin(IPlugin):57 def get_plugin_info(self):...
redis.py
Source:redis.py
...30 @staticmethod31 def _get_sub_key() -> str:32 return 'history'33 @staticmethod34 def _get_item_key(user_id: int, wak_id: int) -> str:35 return f'{user_id}-{wak_id}'36 @classmethod37 async def get_sale_reports(cls, user_id: int, wak_id: int) -> list:38 return await cls.get(cls._get_item_key(user_id, wak_id)) or []39 @classmethod40 async def add_sale_reports(41 cls, user_id: int, wak_id: int, d_from: datetime.date, d_to: datetime.date, brands: list[str]42 ) -> list[dict]:43 new_row: dict = {'date_from': d_from.isoformat(), 'date_to': d_to.isoformat(), 'brands': brands}44 history: list = await cls.get_sale_reports(user_id, wak_id)45 if new_row in history:46 history.remove(new_row)47 history.append(new_row)48 new_history: list[dict] = history[-cls._MAX_RANGE:]49 await cls.set(cls._get_item_key(user_id, wak_id), new_history)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!