Best Python code snippet using autotest_python
drone_manager.py
Source:drone_manager.py
...326 num_processes = self._extract_num_processes(command)327 drone = self._choose_drone_for_execution(num_processes)328 logging.info("command = %s" % command)329 logging.info('log file = %s:%s' % (drone.hostname, log_file))330 self._write_attached_files(command, drone)331 drone.queue_call('execute_command', command, working_directory,332 log_file, pidfile_name)333 pidfile_path = self.absolute_path(os.path.join(working_directory,334 pidfile_name))335 pidfile_id = PidfileId(pidfile_path)336 self.register_pidfile(pidfile_id)337 return pidfile_id338 def get_pidfile_id_from(self, execution_tag, pidfile_name):339 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)340 return PidfileId(path)341 def register_pidfile(self, pidfile_id):342 """343 Indicate that the DroneManager should look for the given pidfile when344 refreshing.345 """346 self._pidfile_age[pidfile_id] = 0347 def get_pidfile_contents(self, pidfile_id, use_second_read=False):348 """349 Retrieve a PidfileContents object for the given pidfile_id. If350 use_second_read is True, use results that were read after the processes351 were checked, instead of before.352 """353 self.register_pidfile(pidfile_id)354 if use_second_read:355 pidfile_map = self._pidfiles_second_read356 else:357 pidfile_map = self._pidfiles358 return pidfile_map.get(pidfile_id, PidfileContents())359 def is_process_running(self, process):360 """361 Check if the given process is in the running process list.362 """363 return process in self._process_set364 def get_temporary_path(self, base_name):365 """366 Get a new temporary path guaranteed to be unique across all drones367 for this scheduler execution.368 """369 self._temporary_path_counter += 1370 return os.path.join(drone_utility._TEMPORARY_DIRECTORY,371 '%s.%s' % (base_name, self._temporary_path_counter))372 def absolute_path(self, path):373 return os.path.join(self._results_dir, path)374 def _copy_results_helper(self, process, source_path, destination_path,375 to_results_repository=False):376 full_source = self.absolute_path(source_path)377 full_destination = self.absolute_path(destination_path)378 source_drone = self._get_drone_for_process(process)379 if to_results_repository:380 source_drone.send_file_to(self._results_drone, full_source,381 full_destination, can_fail=True)382 else:383 source_drone.queue_call('copy_file_or_directory', full_source,384 full_destination)385 def copy_to_results_repository(self, process, source_path,386 destination_path=None):387 """388 Copy results from the given process at source_path to destination_path389 in the results repository.390 """391 if destination_path is None:392 destination_path = source_path393 self._copy_results_helper(process, source_path, destination_path,394 to_results_repository=True)395 def copy_results_on_drone(self, process, source_path, destination_path):396 """397 Copy a results directory from one place to another on the drone.398 """399 self._copy_results_helper(process, source_path, destination_path)400 def _write_attached_files(self, command, drone):401 execution_tag = self._extract_execution_tag(' '.join(command))402 attached_files = self._attached_files.pop(execution_tag, {})403 for file_path, contents in attached_files.iteritems():404 drone.queue_call('write_to_file', self.absolute_path(file_path),405 contents)406 def attach_file_to_execution(self, execution_tag, file_contents,407 file_path=None):408 """409 When the process for execution_tag is executed, the given file contents410 will be placed in a file on the drone. Returns the path at which the411 file will be placed.412 """413 if not file_path:414 file_path = self.get_temporary_path('attach')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!