Best Python code snippet using autotest_python
server_job.py
Source:server_job.py
...280 if not self._using_parser:281 return282 final_tests = self.parser.end()283 for test in final_tests:284 self.__insert_test(test)285 self._using_parser = False286 def verify(self):287 if not self.machines:288 raise error.AutoservError('No machines specified to verify')289 if self.resultdir:290 os.chdir(self.resultdir)291 try:292 namespace = {'machines': self.machines, 'job': self,293 'ssh_user': self._ssh_user,294 'ssh_port': self._ssh_port,295 'ssh_pass': self._ssh_pass}296 self._execute_code(VERIFY_CONTROL_FILE, namespace, protect=False)297 except Exception as e:298 msg = ('Verify failed\n' + str(e) + '\n' + traceback.format_exc())299 self.record('ABORT', None, None, msg)300 raise301 def repair(self, host_protection):302 if not self.machines:303 raise error.AutoservError('No machines specified to repair')304 if self.resultdir:305 os.chdir(self.resultdir)306 namespace = {'machines': self.machines, 'job': self,307 'ssh_user': self._ssh_user, 'ssh_port': self._ssh_port,308 'ssh_pass': self._ssh_pass,309 'protection_level': host_protection}310 self._execute_code(REPAIR_CONTROL_FILE, namespace, protect=False)311 def precheck(self):312 """313 perform any additional checks in derived classes.314 """315 pass316 def enable_external_logging(self):317 """318 Start or restart external logging mechanism.319 """320 pass321 def disable_external_logging(self):322 """323 Pause or stop external logging mechanism.324 """325 pass326 def use_external_logging(self):327 """328 Return True if external logging should be used.329 """330 return False331 def _make_parallel_wrapper(self, function, machines, log):332 """Wrap function as appropriate for calling by parallel_simple."""333 is_forking = not (len(machines) == 1 and self.machines == machines)334 if self._parse_job and is_forking and log:335 def wrapper(machine):336 self._parse_job += "/" + machine337 self._using_parser = True338 self.machines = [machine]339 self.push_execution_context(machine)340 os.chdir(self.resultdir)341 utils.write_keyval(self.resultdir, {"hostname": machine})342 self.init_parser()343 result = function(machine)344 self.cleanup_parser()345 return result346 elif len(machines) > 1 and log:347 def wrapper(machine):348 self.push_execution_context(machine)349 os.chdir(self.resultdir)350 machine_data = {'hostname': machine,351 'status_version': str(self._STATUS_VERSION)}352 utils.write_keyval(self.resultdir, machine_data)353 result = function(machine)354 return result355 else:356 wrapper = function357 return wrapper358 def parallel_simple(self, function, machines, log=True, timeout=None,359 return_results=False):360 """361 Run 'function' using parallel_simple, with an extra wrapper to handle362 the necessary setup for continuous parsing, if possible. If continuous363 parsing is already properly initialized then this should just work.364 :param function: A callable to run in parallel given each machine.365 :param machines: A list of machine names to be passed one per subcommand366 invocation of function.367 :param log: If True, output will be written to output in a subdirectory368 named after each machine.369 :param timeout: Seconds after which the function call should timeout.370 :param return_results: If True instead of an AutoServError being raised371 on any error a list of the results|exceptions from the function372 called on each arg is returned. [default: False]373 :raise error.AutotestError: If any of the functions failed.374 """375 wrapper = self._make_parallel_wrapper(function, machines, log)376 return subcommand.parallel_simple(wrapper, machines,377 log=log, timeout=timeout,378 return_results=return_results)379 def parallel_on_machines(self, function, machines, timeout=None):380 """381 :param function: Called in parallel with one machine as its argument.382 :param machines: A list of machines to call function(machine) on.383 :param timeout: Seconds after which the function call should timeout.384 :return: A list of machines on which function(machine) returned385 without raising an exception.386 """387 results = self.parallel_simple(function, machines, timeout=timeout,388 return_results=True)389 success_machines = []390 for result, machine in itertools.izip(results, machines):391 if not isinstance(result, Exception):392 success_machines.append(machine)393 return success_machines394 _USE_TEMP_DIR = object()395 def run(self, cleanup=False, install_before=False, install_after=False,396 collect_crashdumps=True, namespace={}, control=None,397 control_file_dir=None, only_collect_crashinfo=False):398 # for a normal job, make sure the uncollected logs file exists399 # for a crashinfo-only run it should already exist, bail out otherwise400 created_uncollected_logs = False401 if self.resultdir and not os.path.exists(self._uncollected_log_file):402 if only_collect_crashinfo:403 # if this is a crashinfo-only run, and there were no existing404 # uncollected logs, just bail out early405 logging.info("No existing uncollected logs, "406 "skipping crashinfo collection")407 return408 else:409 log_file = open(self._uncollected_log_file, "w")410 pickle.dump([], log_file)411 log_file.close()412 created_uncollected_logs = True413 # use a copy so changes don't affect the original dictionary414 namespace = namespace.copy()415 machines = self.machines416 if control is None:417 if self.control is None:418 control = ''419 else:420 control = self._load_control_file(self.control)421 if control_file_dir is None:422 control_file_dir = self.resultdir423 self.aborted = False424 namespace['machines'] = machines425 namespace['args'] = self.args426 namespace['job'] = self427 namespace['ssh_user'] = self._ssh_user428 namespace['ssh_port'] = self._ssh_port429 namespace['ssh_pass'] = self._ssh_pass430 test_start_time = int(time.time())431 if self.resultdir:432 os.chdir(self.resultdir)433 # touch status.log so that the parser knows a job is running here434 open(self.get_status_log_path(), 'a').close()435 self.enable_external_logging()436 collect_crashinfo = True437 temp_control_file_dir = None438 try:439 try:440 if install_before and machines:441 self._execute_code(INSTALL_CONTROL_FILE, namespace)442 if only_collect_crashinfo:443 return444 # determine the dir to write the control files to445 cfd_specified = (control_file_dir and control_file_dir is not446 self._USE_TEMP_DIR)447 if cfd_specified:448 temp_control_file_dir = None449 else:450 temp_control_file_dir = tempfile.mkdtemp(451 suffix='temp_control_file_dir')452 control_file_dir = temp_control_file_dir453 server_control_file = os.path.join(control_file_dir,454 self._control_filename)455 client_control_file = os.path.join(control_file_dir,456 CLIENT_CONTROL_FILENAME)457 if self._client:458 namespace['control'] = control459 utils.open_write_close(client_control_file, control)460 shutil.copyfile(CLIENT_WRAPPER_CONTROL_FILE,461 server_control_file)462 else:463 utils.open_write_close(server_control_file, control)464 logging.info("Processing control file")465 self._execute_code(server_control_file, namespace)466 logging.info("Finished processing control file")467 # no error occurred, so we don't need to collect crashinfo468 collect_crashinfo = False469 except Exception as e:470 try:471 logging.exception(472 'Exception escaped control file, job aborting:')473 self.record('INFO', None, None, str(e),474 {'job_abort_reason': str(e)})475 except Exception:476 pass # don't let logging exceptions here interfere477 raise478 finally:479 if temp_control_file_dir:480 # Clean up temp directory used for copies of the control files481 try:482 shutil.rmtree(temp_control_file_dir)483 except Exception as e:484 logging.warn('Could not remove temp directory %s: %s',485 temp_control_file_dir, e)486 if machines and (collect_crashdumps or collect_crashinfo):487 namespace['test_start_time'] = test_start_time488 if collect_crashinfo:489 # includes crashdumps490 self._execute_code(CRASHINFO_CONTROL_FILE, namespace)491 else:492 self._execute_code(CRASHDUMPS_CONTROL_FILE, namespace)493 if self._uncollected_log_file and created_uncollected_logs:494 os.remove(self._uncollected_log_file)495 self.disable_external_logging()496 if cleanup and machines:497 self._execute_code(CLEANUP_CONTROL_FILE, namespace)498 if install_after and machines:499 self._execute_code(INSTALL_CONTROL_FILE, namespace)500 def run_test(self, url, *args, **dargs):501 """502 Summon a test object and run it.503 tag504 tag to add to testname505 url506 url of the test to run507 """508 group, testname = self.pkgmgr.get_package_name(url, 'test')509 testname, subdir, tag = self._build_tagged_test_name(testname, dargs)510 outputdir = self._make_test_outputdir(subdir)511 def group_func():512 try:513 test.runtest(self, url, tag, args, dargs)514 except error.TestBaseException as e:515 self.record(e.exit_status, subdir, testname, str(e))516 raise517 except Exception as e:518 info = str(e) + "\n" + traceback.format_exc()519 self.record('FAIL', subdir, testname, info)520 raise521 else:522 self.record('GOOD', subdir, testname, 'completed successfully')523 result, exc_info = self._run_group(testname, subdir, group_func)524 if exc_info and isinstance(exc_info[1], error.TestBaseException):525 return False526 elif exc_info:527 raise exc_info[0](exc_info[1])528 else:529 return True530 def _run_group(self, name, subdir, function, *args, **dargs):531 """532 Underlying method for running something inside of a group.533 """534 result, exc_info = None, None535 try:536 self.record('START', subdir, name)537 result = function(*args, **dargs)538 except error.TestBaseException as e:539 self.record("END %s" % e.exit_status, subdir, name)540 exc_info = sys.exc_info()541 except Exception as e:542 err_msg = str(e) + '\n'543 err_msg += traceback.format_exc()544 self.record('END ABORT', subdir, name, err_msg)545 raise error.JobError(name + ' failed\n' + traceback.format_exc())546 else:547 self.record('END GOOD', subdir, name)548 return result, exc_info549 def run_group(self, function, *args, **dargs):550 """551 function:552 subroutine to run553 *args:554 arguments for the function555 """556 name = function.__name__557 # Allow the tag for the group to be specified.558 tag = dargs.pop('tag', None)559 if tag:560 name = tag561 return self._run_group(name, None, function, *args, **dargs)[0]562 def run_reboot(self, reboot_func, get_kernel_func):563 """564 A specialization of run_group meant specifically for handling565 a reboot. Includes support for capturing the kernel version566 after the reboot.567 reboot_func: a function that carries out the reboot568 get_kernel_func: a function that returns a string569 representing the kernel version.570 """571 try:572 self.record('START', None, 'reboot')573 reboot_func()574 except Exception as e:575 err_msg = str(e) + '\n' + traceback.format_exc()576 self.record('END FAIL', None, 'reboot', err_msg)577 raise578 else:579 kernel = get_kernel_func()580 self.record('END GOOD', None, 'reboot',581 optional_fields={"kernel": kernel})582 def run_control(self, path):583 """Execute a control file found at path (relative to the autotest584 path). Intended for executing a control file within a control file,585 not for running the top-level job control file."""586 path = os.path.join(self.autodir, path)587 control_file = self._load_control_file(path)588 self.run(control=control_file, control_file_dir=self._USE_TEMP_DIR)589 def add_sysinfo_command(self, command, logfile=None, on_every_test=False):590 self._add_sysinfo_loggable(sysinfo.command(command, logf=logfile),591 on_every_test)592 def add_sysinfo_logfile(self, file, on_every_test=False):593 self._add_sysinfo_loggable(sysinfo.logfile(file), on_every_test)594 def _add_sysinfo_loggable(self, loggable, on_every_test):595 if on_every_test:596 self.sysinfo.test_loggables.add(loggable)597 else:598 self.sysinfo.boot_loggables.add(loggable)599 def _read_warnings(self):600 """Poll all the warning loggers and extract any new warnings that have601 been logged. If the warnings belong to a category that is currently602 disabled, this method will discard them and they will no longer be603 retrievable.604 Returns a list of (timestamp, message) tuples, where timestamp is an605 integer epoch timestamp."""606 warnings = []607 while True:608 # pull in a line of output from every logger that has609 # output ready to be read610 loggers, _, _ = select.select(self.warning_loggers, [], [], 0)611 closed_loggers = set()612 for logger in loggers:613 line = logger.readline()614 # record any broken pipes (aka line == empty)615 if len(line) == 0:616 closed_loggers.add(logger)617 continue618 # parse out the warning619 timestamp, msgtype, msg = line.split('\t', 2)620 timestamp = int(timestamp)621 # if the warning is valid, add it to the results622 if self.warning_manager.is_valid(timestamp, msgtype):623 warnings.append((timestamp, msg.strip()))624 # stop listening to loggers that are closed625 self.warning_loggers -= closed_loggers626 # stop if none of the loggers have any output left627 if not loggers:628 break629 # sort into timestamp order630 warnings.sort()631 return warnings632 def _unique_subdirectory(self, base_subdirectory_name):633 """Compute a unique results subdirectory based on the given name.634 Appends base_subdirectory_name with a number as necessary to find a635 directory name that doesn't already exist.636 """637 subdirectory = base_subdirectory_name638 counter = 1639 while os.path.exists(os.path.join(self.resultdir, subdirectory)):640 subdirectory = base_subdirectory_name + '.' + str(counter)641 counter += 1642 return subdirectory643 def get_record_context(self):644 """Returns an object representing the current job.record context.645 The object returned is an opaque object with a 0-arg restore method646 which can be called to restore the job.record context (i.e. indentation)647 to the current level. The intention is that it should be used when648 something external which generate job.record calls (e.g. an autotest649 client) can fail catastrophically and the server job record state650 needs to be reset to its original "known good" state.651 :return: A context object with a 0-arg restore() method."""652 return self._indenter.get_context()653 def record_summary(self, status_code, test_name, reason='', attributes=None,654 distinguishing_attributes=(), child_test_ids=None):655 """Record a summary test result.656 :param status_code: status code string, see657 shared.log.is_valid_status()658 :param test_name: name of the test659 :param reason: (optional) string providing detailed reason for test660 outcome661 :param attributes: (optional) dict of string keyvals to associate with662 this result663 :param distinguishing_attributes: (optional) list of attribute names664 that should be used to distinguish identically-named test665 results. These attributes should be present in the attributes666 parameter. This is used to generate user-friendly subdirectory667 names.668 :param child_test_ids: (optional) list of test indices for test results669 used in generating this result.670 """671 subdirectory_name_parts = [test_name]672 for attribute in distinguishing_attributes:673 assert attributes674 assert attribute in attributes, '%s not in %s' % (attribute,675 attributes)676 subdirectory_name_parts.append(attributes[attribute])677 base_subdirectory_name = '.'.join(subdirectory_name_parts)678 subdirectory = self._unique_subdirectory(base_subdirectory_name)679 subdirectory_path = os.path.join(self.resultdir, subdirectory)680 os.mkdir(subdirectory_path)681 self.record(status_code, subdirectory, test_name,682 status=reason, optional_fields={'is_summary': True})683 if attributes:684 utils.write_keyval(subdirectory_path, attributes)685 if child_test_ids:686 ids_string = ','.join(str(test_id) for test_id in child_test_ids)687 summary_data = {'child_test_ids': ids_string}688 utils.write_keyval(os.path.join(subdirectory_path, 'summary_data'),689 summary_data)690 def disable_warnings(self, warning_type):691 self.warning_manager.disable_warnings(warning_type)692 self.record("INFO", None, None,693 "disabling %s warnings" % warning_type,694 {"warnings.disable": warning_type})695 def enable_warnings(self, warning_type):696 self.warning_manager.enable_warnings(warning_type)697 self.record("INFO", None, None,698 "enabling %s warnings" % warning_type,699 {"warnings.enable": warning_type})700 def get_status_log_path(self, subdir=None):701 """Return the path to the job status log.702 :param subdir - Optional parameter indicating that you want the path703 to a subdirectory status log.704 :return: The path where the status log should be.705 """706 if self.resultdir:707 if subdir:708 return os.path.join(self.resultdir, subdir, "status.log")709 else:710 return os.path.join(self.resultdir, "status.log")711 else:712 return None713 def _update_uncollected_logs_list(self, update_func):714 """Updates the uncollected logs list in a multi-process safe manner.715 :param update_func - a function that updates the list of uncollected716 logs. Should take one parameter, the list to be updated.717 """718 if self._uncollected_log_file:719 log_file = open(self._uncollected_log_file, "r+")720 fcntl.flock(log_file, fcntl.LOCK_EX)721 try:722 uncollected_logs = pickle.load(log_file)723 update_func(uncollected_logs)724 log_file.seek(0)725 log_file.truncate()726 pickle.dump(uncollected_logs, log_file)727 log_file.flush()728 finally:729 fcntl.flock(log_file, fcntl.LOCK_UN)730 log_file.close()731 def add_client_log(self, hostname, remote_path, local_path):732 """Adds a new set of client logs to the list of uncollected logs,733 to allow for future log recovery.734 :param host - the hostname of the machine holding the logs735 :param remote_path - the directory on the remote machine holding logs736 :param local_path - the local directory to copy the logs into737 """738 def update_func(logs_list):739 logs_list.append((hostname, remote_path, local_path))740 self._update_uncollected_logs_list(update_func)741 def remove_client_log(self, hostname, remote_path, local_path):742 """Removes a set of client logs from the list of uncollected logs,743 to allow for future log recovery.744 :param host - the hostname of the machine holding the logs745 :param remote_path - the directory on the remote machine holding logs746 :param local_path - the local directory to copy the logs into747 """748 def update_func(logs_list):749 logs_list.remove((hostname, remote_path, local_path))750 self._update_uncollected_logs_list(update_func)751 def get_client_logs(self):752 """Retrieves the list of uncollected logs, if it exists.753 :return: A list of (host, remote_path, local_path) tuples. Returns754 an empty list if no uncollected logs file exists.755 """756 log_exists = (self._uncollected_log_file and757 os.path.exists(self._uncollected_log_file))758 if log_exists:759 return pickle.load(open(self._uncollected_log_file))760 else:761 return []762 def _fill_server_control_namespace(self, namespace, protect=True):763 """764 Prepare a namespace to be used when executing server control files.765 This sets up the control file API by importing modules and making them766 available under the appropriate names within namespace.767 For use by _execute_code().768 Args:769 namespace: The namespace dictionary to fill in.770 protect: Boolean. If True (the default) any operation that would771 clobber an existing entry in namespace will cause an error.772 Raises:773 error.AutoservError: When a name would be clobbered by import.774 """775 def _import_names(module_name, names=()):776 """777 Import a module and assign named attributes into namespace.778 Args:779 module_name: The string module name.780 names: A limiting list of names to import from module_name. If781 empty (the default), all names are imported from the module782 similar to a "from foo.bar import *" statement.783 Raises:784 error.AutoservError: When a name being imported would clobber785 a name already in namespace.786 """787 module = __import__(module_name, {}, {}, names)788 # No names supplied? Import * from the lowest level module.789 # (Ugh, why do I have to implement this part myself?)790 if not names:791 for submodule_name in module_name.split('.')[1:]:792 module = getattr(module, submodule_name)793 if hasattr(module, '__all__'):794 names = getattr(module, '__all__')795 else:796 names = dir(module)797 # Install each name into namespace, checking to make sure it798 # doesn't override anything that already exists.799 for name in names:800 # Check for conflicts to help prevent future problems.801 if name in namespace and protect:802 if namespace[name] is not getattr(module, name):803 raise error.AutoservError('importing name '804 '%s from %s %r would override %r' %805 (name, module_name, getattr(module, name),806 namespace[name]))807 else:808 # Encourage cleanliness and the use of __all__ for a809 # more concrete API with less surprises on '*' imports.810 warnings.warn('%s (%r) being imported from %s for use '811 'in server control files is not the '812 'first occurrence of that import.' %813 (name, namespace[name], module_name))814 namespace[name] = getattr(module, name)815 # This is the equivalent of prepending a bunch of import statements to816 # the front of the control script.817 namespace.update(os=os, sys=sys, logging=logging)818 _import_names('autotest.server',819 ('hosts', 'autotest_remote', 'standalone_profiler',820 'source_kernel', 'rpm_kernel', 'deb_kernel', 'git_kernel'))821 _import_names('autotest.server.subcommand',822 ('parallel', 'parallel_simple', 'subcommand'))823 _import_names('autotest.server.utils',824 ('run', 'get_tmp_dir', 'sh_escape', 'parse_machine'))825 _import_names('autotest.client.shared.error')826 _import_names('autotest.client.shared.barrier', ('barrier',))827 # Inject ourself as the job object into other classes within the API.828 # (Yuck, this injection is a gross thing be part of a public API. -gps)829 #830 # XXX Base & SiteAutotest do not appear to use .job. Who does?831 namespace['autotest_remote'].Autotest.job = self832 # server.hosts.base_classes.Host uses .job.833 namespace['hosts'].Host.job = self834 namespace['hosts'].factory.ssh_user = self._ssh_user835 namespace['hosts'].factory.ssh_port = self._ssh_port836 namespace['hosts'].factory.ssh_pass = self._ssh_pass837 def _execute_code(self, code_file, namespace, protect=True):838 """839 Execute code using a copy of namespace as a server control script.840 Unless protect_namespace is explicitly set to False, the dict will not841 be modified.842 Args:843 code_file: The filename of the control file to execute.844 namespace: A dict containing names to make available during execution.845 protect: Boolean. If True (the default) a copy of the namespace dict846 is used during execution to prevent the code from modifying its847 contents outside of this function. If False the raw dict is848 passed in and modifications will be allowed.849 """850 if protect:851 namespace = namespace.copy()852 self._fill_server_control_namespace(namespace, protect=protect)853 # TODO: Simplify and get rid of the special cases for only 1 machine.854 if len(self.machines) > 1:855 machines_text = '\n'.join(self.machines) + '\n'856 # Only rewrite the file if it does not match our machine list.857 try:858 machines_f = open(MACHINES_FILENAME, 'r')859 existing_machines_text = machines_f.read()860 machines_f.close()861 except EnvironmentError:862 existing_machines_text = None863 if machines_text != existing_machines_text:864 utils.open_write_close(MACHINES_FILENAME, machines_text)865 execfile(code_file, namespace, namespace)866 def _parse_status(self, new_line):867 if not self._using_parser:868 return869 new_tests = self.parser.process_lines([new_line])870 for test in new_tests:871 self.__insert_test(test)872 def __insert_test(self, test):873 """874 An internal method to insert a new test result into the875 database. This method will not raise an exception, even if an876 error occurs during the insert, to avoid failing a test877 simply because of unexpected database issues."""878 self.num_tests_run += 1879 if status_lib.is_worse_than_or_equal_to(test.status, 'FAIL'):880 self.num_tests_failed += 1881 try:882 dbutils.insert_test(self.job_model, test)883 except Exception:884 msg = ("WARNING: An unexpected error occurred while "885 "inserting test results into the database. "886 "Ignoring error.\n" + traceback.format_exc())...
views.py
Source:views.py
...94 obj = GFactory(self.gdrive, {'mimeType': u'application/vnd.google-apps.folder'})95 assert isinstance(obj, GFolder), 'Factory test failed on GFolder'96 obj = GFactory(self.gdrive, {'mimeType': u'application/vnd.google-apps.random'})97 assert isinstance(obj, GFile), 'Factory test failed on GFile'98 def __insert_test(self):99 file_to_insert = GSheet(self.gdrive, {'title': 'test'})100 inserted = file_to_insert.save()101 msg = 'Inserted file is not instance of GSheet as expected'102 assert isinstance(inserted, GSheet), msg103 msg = 'Inserted file title and origin title are different'104 assert inserted.title == file_to_insert.title, msg105 return inserted106 def __update_test(self, file_to_update):107 file_to_update.description = 'hello world'108 updated = file_to_update.save()109 msg = 'Updated file and origin file are different'110 assert file_to_update == updated, msg111 msg = 'Updated file description and origin description are different'112 assert file_to_update.description == updated.description, msg113 return updated114 def __delete_test(self, file_to_delete):115 deleted = file_to_delete.delete()116 msg = 'Delete test faild.'117 assert deleted == '', msg118 def __get_worksheets_test(self, f):119 assert f._worksheets is None, 'Lazy worksheets is not None'120 assert f.get_worksheets()[0].title == u'ÐиÑÑ1', 'Worksheets init faild'121 assert f._worksheets == f.get_worksheets(), 'Lazy retrieving does not work'122 def __save_worksheet_test(self, f):123 w = GWorkSheet(f, {'title': 'test1', 'col_count': 10, 'row_count': 10})124 w.save()125 assert w._id is not None, 'Add worksheet faild'126 w = GWorkSheet(f, {'title': 'test1', 'col_count': 10, 'row_count': 10})127 try:128 w.save()129 assert False, 'Two GWorkSheets with the same title can not be saved'130 except GError, e:131 pass132 # w.title = 'test2'133 # w.col_count = 12134 # w.save()135 def get(self, request):136 self.__prepare_tests(request)137 try:138 self.__factory_test()139 file_inserted = self.__insert_test()140 file_updated = self.__update_test(file_inserted)141 self.__get_worksheets_test(file_updated)142 self.__save_worksheet_test(file_updated)143 # self.__delete_test(file_updated)144 except AssertionError, error:145 self.errors += [str(error)]...
test.py
Source:test.py
...128 self.__tests = TestGroup("")129 def load(self, raw_yaml, src_path):130 data = yaml.safe_load(raw_yaml)131 for test in data:132 self.__insert_test(data[test], test, src_path.parts)133 def run(self, p, r):134 return self.__tests.run(p, r, True)135 def __insert_test(self, steps, name, groups_path):136 group = self.__tests137 for subgroup in groups_path:138 group = group.subgroup(subgroup)139 group.add(SingleTest(name, [self.__parse_step(s) for s in steps]))140 def __parse_step(self, data):141 for type_key in ["get", "compile", "run"]:142 if type_key in data:143 return (type_key, data)144 @property145 def count_total(self):146 return self.__tests.count_tests()147"""148â â ⬠â149â â â â...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!