Best Python code snippet using autotest_python
luciferlib.py
Source:luciferlib.py
...45 """46 hqe = models.HostQueueEntry.objects.get(id=hqe_id)47 hqes = hqe.job.hostqueueentry_set.all()48 try:49 _get_consistent_execution_path(hqes)50 except _ExecutionPathError:51 return True52 return False53# TODO(crbug.com/748234): This is temporary to enable toggling54# lucifer rollouts with an option.55def spawn_starting_job_handler(manager, job):56 """Spawn job_reporter to handle a job.57 Pass all arguments by keyword.58 @param manager: scheduler.drone_manager.DroneManager instance59 @param job: Job instance60 @returns: Drone instance61 """62 raise NotImplementedError63# TODO(crbug.com/748234): This is temporary to enable toggling64# lucifer rollouts with an option.65def spawn_gathering_job_handler(manager, job, autoserv_exit, pidfile_id=None):66 """Spawn job_reporter to handle a job.67 Pass all arguments by keyword.68 @param manager: scheduler.drone_manager.DroneManager instance69 @param job: Job instance70 @param autoserv_exit: autoserv exit status71 @param pidfile_id: PidfileId instance72 @returns: Drone instance73 """74 manager = _DroneManager(manager)75 if pidfile_id is None:76 drone = manager.pick_drone_to_use()77 else:78 drone = manager.get_drone_for_pidfile(pidfile_id)79 results_dir = _results_dir(manager, job)80 num_tests_failed = manager.get_num_tests_failed(pidfile_id)81 args = [82 _JOB_REPORTER_PATH,83 # General configuration84 '--jobdir', _get_jobdir(),85 '--run-job-path', _get_run_job_path(),86 '--watcher-path', _get_watcher_path(),87 # Job specific88 '--job-id', str(job.id),89 '--lucifer-level', 'GATHERING',90 '--autoserv-exit', str(autoserv_exit),91 '--need-gather',92 '--num-tests-failed', str(num_tests_failed),93 '--results-dir', results_dir,94 ]95 if _get_gcp_creds():96 args = [97 'GOOGLE_APPLICATION_CREDENTIALS=%s'98 % pipes.quote(_get_gcp_creds()),99 ] + args100 output_file = os.path.join(results_dir, 'job_reporter_output.log')101 drone.spawn(_ENV, args, output_file=output_file)102 return drone103# TODO(crbug.com/748234): This is temporary to enable toggling104# lucifer rollouts with an option.105def spawn_parsing_job_handler(manager, job, autoserv_exit, pidfile_id=None):106 """Spawn job_reporter to handle a job.107 Pass all arguments by keyword.108 @param manager: scheduler.drone_manager.DroneManager instance109 @param job: Job instance110 @param autoserv_exit: autoserv exit status111 @param pidfile_id: PidfileId instance112 @returns: Drone instance113 """114 manager = _DroneManager(manager)115 if pidfile_id is None:116 drone = manager.pick_drone_to_use()117 else:118 drone = manager.get_drone_for_pidfile(pidfile_id)119 results_dir = _results_dir(manager, job)120 args = [121 _JOB_REPORTER_PATH,122 # General configuration123 '--jobdir', _get_jobdir(),124 '--run-job-path', _get_run_job_path(),125 '--watcher-path', _get_watcher_path(),126 # Job specific127 '--job-id', str(job.id),128 '--lucifer-level', 'GATHERING',129 '--autoserv-exit', str(autoserv_exit),130 '--results-dir', results_dir,131 ]132 if _get_gcp_creds():133 args = [134 'GOOGLE_APPLICATION_CREDENTIALS=%s'135 % pipes.quote(_get_gcp_creds()),136 ] + args137 output_file = os.path.join(results_dir, 'job_reporter_output.log')138 drone.spawn(_ENV, args, output_file=output_file)139 return drone140def _get_jobdir():141 return _config.get_config_value(_SECTION, 'jobdir')142def _get_run_job_path():143 return os.path.join(_get_binaries_path(), 'lucifer_run_job')144def _get_watcher_path():145 return os.path.join(_get_binaries_path(), 'lucifer_watcher')146def _get_binaries_path():147 """Get binaries dir path from config.."""148 return _config.get_config_value(_SECTION, 'binaries_path')149def _get_gcp_creds():150 """Return path to GCP service account credentials.151 This is the empty string by default, if no credentials will be used.152 """153 return _config.get_config_value(_SECTION, 'gcp_creds', default='')154class _DroneManager(object):155 """Simplified drone API."""156 def __init__(self, old_manager):157 """Initialize instance.158 @param old_manager: old style DroneManager159 """160 self._manager = old_manager161 def get_num_tests_failed(self, pidfile_id):162 """Return the number of tests failed for autoserv by pidfile.163 @param pidfile_id: PidfileId instance.164 @returns: int (-1 if missing)165 """166 state = self._manager.get_pidfile_contents(pidfile_id)167 if state.num_tests_failed is None:168 return -1169 return state.num_tests_failed170 def get_drone_for_pidfile(self, pidfile_id):171 """Return a drone to use from a pidfile.172 @param pidfile_id: PidfileId instance.173 """174 return _wrap_drone(self._manager.get_drone_for_pidfile_id(pidfile_id))175 def pick_drone_to_use(self, num_processes=1, prefer_ssp=False):176 """Return a drone to use.177 Various options can be passed to optimize drone selection.178 @param num_processes: number of processes the drone is intended179 to run180 @param prefer_ssp: indicates whether drones supporting181 server-side packaging should be preferred. The returned182 drone is not guaranteed to support it.183 """184 old_drone = self._manager.pick_drone_to_use(185 num_processes=num_processes,186 prefer_ssp=prefer_ssp,187 )188 return _wrap_drone(old_drone)189 def absolute_path(self, path):190 """Return absolute path for drone results.191 The returned path might be remote.192 """193 return self._manager.absolute_path(path)194def _wrap_drone(old_drone):195 """Wrap an old style drone."""196 host = old_drone._host197 if isinstance(host, local_host.LocalHost):198 return LocalDrone()199 elif isinstance(host, ssh_host.SSHHost):200 return RemoteDrone(host)201 else:202 raise TypeError('Drone has an unknown host type')203def _results_dir(manager, job):204 """Return results dir for a job.205 Path may be on a remote host.206 """207 return manager.absolute_path(_working_directory(job))208def _working_directory(job):209 return _get_consistent_execution_path(job.hostqueueentry_set.all())210def _get_consistent_execution_path(execution_entries):211 first_execution_path = execution_entries[0].execution_path()212 for execution_entry in execution_entries[1:]:213 if execution_entry.execution_path() != first_execution_path:214 raise _ExecutionPathError(215 '%s (%s) != %s (%s)'216 % (execution_entry.execution_path(),217 execution_entry,218 first_execution_path,219 execution_entries[0]))220 return first_execution_path221class _ExecutionPathError(Exception):222 """Raised by _get_consistent_execution_path()."""223class Drone(object):224 """Simplified drone API."""225 def hostname(self):226 """Return the hostname of the drone."""227 def spawn(self, path, args, output_file):228 """Spawn an independent process.229 path must be an absolute path. path may be on a remote machine.230 args is a list of arguments.231 The process is spawned in its own session. It should not try to232 obtain a controlling terminal.233 The new process will have stdin opened to /dev/null and stdout,234 stderr opened to output_file.235 output_file is a pathname, but how it is interpreted is236 implementation defined, e.g., it may be a remote file....
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!