Best Python code snippet using autotest_python
site_server_job.py
Source:site_server_job.py
1# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4"""Site extensions to server_job. Adds distribute_across_machines()."""5import os, logging, multiprocessing6from autotest_lib.server import site_gtest_runner, site_server_job_utils7from autotest_lib.server import subcommand8from autotest_lib.server.server_job import base_server_job9import utils10def get_site_job_data(job):11 """Add custom data to the job keyval info.12 When multiple machines are used in a job, change the hostname to13 the platform of the first machine instead of machine1,machine2,... This14 makes the job reports easier to read and keeps the tko_machines table from15 growing too large.16 Args:17 job: instance of server_job.18 Returns:19 keyval dictionary with new hostname value, or empty dictionary.20 """21 site_job_data = {}22 # Only modify hostname on multimachine jobs. Assume all host have the same23 # platform.24 if len(job.machines) > 1:25 # Search through machines for first machine with a platform.26 for host in job.machines:27 keyval_path = os.path.join(job.resultdir, 'host_keyvals', host)28 keyvals = utils.read_keyval(keyval_path)29 host_plat = keyvals.get('platform', None)30 if not host_plat:31 continue32 site_job_data['hostname'] = host_plat33 break34 return site_job_data35class site_server_job(base_server_job):36 """Extend server_job adding distribute_across_machines."""37 def __init__(self, *args, **dargs):38 super(site_server_job, self).__init__(*args, **dargs)39 def run(self, *args, **dargs):40 """Extend server_job.run adding gtest_runner to the namespace."""41 gtest_run = {'gtest_runner': site_gtest_runner.gtest_runner()}42 # Namespace is the 5th parameter to run(). If args has 5 or more43 # entries in it then we need to fix-up this namespace entry.44 if len(args) >= 5:45 args[4].update(gtest_run)46 # Else, if present, namespace must be in dargs.47 else:48 dargs.setdefault('namespace', gtest_run).update(gtest_run)49 # Now call the original run() with the modified namespace containing a50 # gtest_runner51 super(site_server_job, self).run(*args, **dargs)52 def distribute_across_machines(self, tests, machines,53 continuous_parsing=False):54 """Run each test in tests once using machines.55 Instead of running each test on each machine like parallel_on_machines,56 run each test once across all machines. Put another way, the total57 number of tests run by parallel_on_machines is len(tests) *58 len(machines). The number of tests run by distribute_across_machines is59 len(tests).60 Args:61 tests: List of tests to run.62 machines: List of machines to use.63 continuous_parsing: Bool, if true parse job while running.64 """65 # The Queue is thread safe, but since a machine may have to search66 # through the queue to find a valid test the lock provides exclusive67 # queue access for more than just the get call.68 test_queue = multiprocessing.JoinableQueue()69 test_queue_lock = multiprocessing.Lock()70 unique_machine_attributes = []71 sub_commands = []72 work_dir = self.resultdir73 for machine in machines:74 if 'group' in self.resultdir:75 work_dir = os.path.join(self.resultdir, machine)76 mw = site_server_job_utils.machine_worker(self,77 machine,78 work_dir,79 test_queue,80 test_queue_lock,81 continuous_parsing)82 # Create the subcommand instance to run this machine worker.83 sub_commands.append(subcommand.subcommand(mw.run,84 [],85 work_dir))86 # To (potentially) speed up searching for valid tests create a list87 # of unique attribute sets present in the machines for this job. If88 # sets were hashable we could just use a dictionary for fast89 # verification. This at least reduces the search space from the90 # number of machines to the number of unique machines.91 if not mw.attribute_set in unique_machine_attributes:92 unique_machine_attributes.append(mw.attribute_set)93 # Only queue tests which are valid on at least one machine. Record94 # skipped tests in the status.log file using record_skipped_test().95 for test_entry in tests:96 # Check if it's an old style test entry.97 if len(test_entry) > 2 and not isinstance(test_entry[2], dict):98 test_attribs = {'include': test_entry[2]}99 if len(test_entry) > 3:100 test_attribs['exclude'] = test_entry[3]101 if len(test_entry) > 4:102 test_attribs['attributes'] = test_entry[4]103 test_entry = list(test_entry[:2])104 test_entry.append(test_attribs)105 ti = site_server_job_utils.test_item(*test_entry)106 machine_found = False107 for ma in unique_machine_attributes:108 if ti.validate(ma):109 test_queue.put(ti)110 machine_found = True111 break112 if not machine_found:113 self.record_skipped_test(ti)114 # Run valid tests and wait for completion.115 subcommand.parallel(sub_commands)116 def record_skipped_test(self, skipped_test, message=None):117 """Insert a failure record into status.log for this test."""118 msg = message119 if msg is None:120 msg = 'No valid machines found for test %s.' % skipped_test121 logging.info(msg)122 self.record('START', None, skipped_test.test_name)123 self.record('INFO', None, skipped_test.test_name, msg)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!