Best Python code snippet using autotest_python
drone_manager.py
Source:drone_manager.py
...166 default=None)167 if allowed_users is not None:168 allowed_users = set(allowed_users.split())169 drone.allowed_users = allowed_users170 self._reorder_drone_queue() # max_processes may have changed171 def get_drones(self):172 return self._drones.itervalues()173 def _get_drone_for_process(self, process):174 return self._drones[process.hostname]175 def _get_drone_for_pidfile_id(self, pidfile_id):176 pidfile_contents = self.get_pidfile_contents(pidfile_id)177 assert pidfile_contents.process is not None178 return self._get_drone_for_process(pidfile_contents.process)179 def _drop_old_pidfiles(self):180 # use items() since the dict is modified in unregister_pidfile()181 for pidfile_id, info in self._registered_pidfile_info.items():182 if info.age > self._get_max_pidfile_refreshes():183 logging.warning('dropping leaked pidfile %s', pidfile_id)184 self.unregister_pidfile(pidfile_id)185 else:186 info.age += 1187 def _reset(self):188 self._process_set = set()189 self._pidfiles = {}190 self._pidfiles_second_read = {}191 self._drone_queue = []192 def _call_all_drones(self, method, *args, **kwargs):193 all_results = {}194 for drone in self.get_drones():195 all_results[drone] = drone.call(method, *args, **kwargs)196 return all_results197 def _parse_pidfile(self, drone, raw_contents):198 contents = PidfileContents()199 if not raw_contents:200 return contents201 lines = raw_contents.splitlines()202 if len(lines) > 3:203 return InvalidPidfile('Corrupt pid file (%d lines):\n%s' %204 (len(lines), lines))205 try:206 pid = int(lines[0])207 contents.process = Process(drone.hostname, pid)208 # if len(lines) == 2, assume we caught Autoserv between writing209 # exit_status and num_failed_tests, so just ignore it and wait for210 # the next cycle211 if len(lines) == 3:212 contents.exit_status = int(lines[1])213 contents.num_tests_failed = int(lines[2])214 except ValueError as exc:215 return InvalidPidfile('Corrupt pid file: ' + str(exc.args))216 return contents217 def _process_pidfiles(self, drone, pidfiles, store_in_dict):218 for pidfile_path, contents in pidfiles.iteritems():219 pidfile_id = PidfileId(pidfile_path)220 contents = self._parse_pidfile(drone, contents)221 store_in_dict[pidfile_id] = contents222 def _add_process(self, drone, process_info):223 process = Process(drone.hostname, int(process_info['pid']),224 int(process_info['ppid']))225 self._process_set.add(process)226 def _add_autoserv_process(self, drone, process_info):227 assert process_info['comm'] == 'autotest-remote'228 # only root autoserv processes have pgid == pid229 if process_info['pgid'] != process_info['pid']:230 return231 self._add_process(drone, process_info)232 def _enqueue_drone(self, drone):233 heapq.heappush(self._drone_queue, _DroneHeapWrapper(drone))234 def _reorder_drone_queue(self):235 heapq.heapify(self._drone_queue)236 def _compute_active_processes(self, drone):237 drone.active_processes = 0238 for pidfile_id, contents in self._pidfiles.iteritems():239 is_running = contents.exit_status is None240 on_this_drone = (contents.process and241 contents.process.hostname == drone.hostname)242 if is_running and on_this_drone:243 info = self._registered_pidfile_info[pidfile_id]244 if info.num_processes is not None:245 drone.active_processes += info.num_processes246 def refresh(self):247 """248 Called at the beginning of a scheduler cycle to refresh all process249 information.250 """251 self._reset()252 self._drop_old_pidfiles()253 pidfile_paths = [pidfile_id.path254 for pidfile_id in self._registered_pidfile_info]255 all_results = self._call_all_drones('refresh', pidfile_paths)256 for drone, results_list in all_results.iteritems():257 results = results_list[0]258 for process_info in results['autoserv_processes']:259 self._add_autoserv_process(drone, process_info)260 for process_info in results['parse_processes']:261 self._add_process(drone, process_info)262 self._process_pidfiles(drone, results['pidfiles'], self._pidfiles)263 self._process_pidfiles(drone, results['pidfiles_second_read'],264 self._pidfiles_second_read)265 self._compute_active_processes(drone)266 if drone.enabled:267 self._enqueue_drone(drone)268 def execute_actions(self):269 """270 Called at the end of a scheduler cycle to execute all queued actions271 on drones.272 """273 for drone in self._drones.values():274 drone.execute_queued_calls()275 try:276 self._results_drone.execute_queued_calls()277 except error.AutoservError:278 warning = ('Results repository failed to execute calls:\n' +279 traceback.format_exc())280 mail.manager.enqueue_admin('Results repository error', warning)281 self._results_drone.clear_call_queue()282 def get_orphaned_autoserv_processes(self):283 """284 Returns a set of Process objects for orphaned processes only.285 """286 return set(process for process in self._process_set287 if process.ppid == 1)288 def kill_process(self, process):289 """290 Kill the given process.291 """292 logging.info('killing %s', process)293 drone = self._get_drone_for_process(process)294 drone.queue_call('kill_process', process)295 def _ensure_directory_exists(self, path):296 if not os.path.exists(path):297 os.makedirs(path)298 def total_running_processes(self):299 return sum(drone.active_processes for drone in self.get_drones())300 def max_runnable_processes(self, username, drone_hostnames_allowed):301 """302 Return the maximum number of processes that can be run (in a single303 execution) given the current load on drones.304 :param username: login of user to run a process. may be None.305 :param drone_hostnames_allowed: list of drones that can be used. May be306 None307 """308 usable_drone_wrappers = [wrapper for wrapper in self._drone_queue309 if wrapper.drone.usable_by(username) and310 (drone_hostnames_allowed is None or311 wrapper.drone.hostname in312 drone_hostnames_allowed)]313 if not usable_drone_wrappers:314 # all drones disabled or inaccessible315 return 0316 runnable_processes = [317 wrapper.drone.max_processes - wrapper.drone.active_processes318 for wrapper in usable_drone_wrappers]319 return max([0] + runnable_processes)320 def _least_loaded_drone(self, drones):321 drone_to_use = drones[0]322 for drone in drones[1:]:323 if drone.used_capacity() < drone_to_use.used_capacity():324 drone_to_use = drone325 return drone_to_use326 def _choose_drone_for_execution(self, num_processes, username,327 drone_hostnames_allowed):328 # cycle through drones is order of increasing used capacity until329 # we find one that can handle these processes330 checked_drones = []331 usable_drones = []332 drone_to_use = None333 while self._drone_queue:334 drone = heapq.heappop(self._drone_queue).drone335 checked_drones.append(drone)336 logging.info('Checking drone %s', drone.hostname)337 if not drone.usable_by(username):338 continue339 drone_allowed = (drone_hostnames_allowed is None or340 drone.hostname in drone_hostnames_allowed)341 if not drone_allowed:342 logging.debug('Drone %s not allowed: ', drone.hostname)343 continue344 usable_drones.append(drone)345 if drone.active_processes + num_processes <= drone.max_processes:346 drone_to_use = drone347 break348 logging.info('Drone %s has %d active + %s requested > %s max',349 drone.hostname, drone.active_processes, num_processes,350 drone.max_processes)351 if not drone_to_use and usable_drones:352 drone_summary = ','.join('%s %s/%s' % (drone.hostname,353 drone.active_processes,354 drone.max_processes)355 for drone in usable_drones)356 logging.error('No drone has capacity to handle %d processes (%s) '357 'for user %s', num_processes, drone_summary, username)358 drone_to_use = self._least_loaded_drone(usable_drones)359 # refill _drone_queue360 for drone in checked_drones:361 self._enqueue_drone(drone)362 return drone_to_use363 def _substitute_working_directory_into_command(self, command,364 working_directory):365 for i, item in enumerate(command):366 if item is WORKING_DIRECTORY:367 command[i] = working_directory368 def execute_command(self, command, working_directory, pidfile_name,369 num_processes, log_file=None, paired_with_pidfile=None,370 username=None, drone_hostnames_allowed=None):371 """372 Execute the given command, taken as an argv list.373 :param command: command to execute as a list. if any item is374 WORKING_DIRECTORY, the absolute path to the working directory375 will be substituted for it.376 :param working_directory: directory in which the pidfile will be written377 :param pidfile_name: name of the pidfile this process will write378 :param num_processes: number of processes to account for from this379 execution380 :param log_file (optional): path (in the results repository) to hold381 command output.382 :param paired_with_pidfile (optional): a PidfileId for an383 already-executed process; the new process will execute on the384 same drone as the previous process.385 :param username (optional): login of the user responsible for this386 process.387 :param drone_hostnames_allowed (optional): hostnames of the drones that388 this command is allowed to389 execute on390 """391 abs_working_directory = self.absolute_path(working_directory)392 if not log_file:393 log_file = self.get_temporary_path('execute')394 log_file = self.absolute_path(log_file)395 self._substitute_working_directory_into_command(command,396 abs_working_directory)397 if paired_with_pidfile:398 drone = self._get_drone_for_pidfile_id(paired_with_pidfile)399 else:400 drone = self._choose_drone_for_execution(num_processes, username,401 drone_hostnames_allowed)402 if not drone:403 raise DroneManagerError('command failed; no drones available: %s'404 % command)405 logging.info("command = %s" % command)406 logging.info('log file = %s:%s' % (drone.hostname, log_file))407 self._write_attached_files(working_directory, drone)408 drone.queue_call('execute_command', command, abs_working_directory,409 log_file, pidfile_name)410 drone.active_processes += num_processes411 self._reorder_drone_queue()412 pidfile_path = os.path.join(abs_working_directory, pidfile_name)413 pidfile_id = PidfileId(pidfile_path)414 self.register_pidfile(pidfile_id)415 self._registered_pidfile_info[pidfile_id].num_processes = num_processes416 return pidfile_id417 def get_pidfile_id_from(self, execution_tag, pidfile_name):418 path = os.path.join(self.absolute_path(execution_tag), pidfile_name)419 return PidfileId(path)420 def register_pidfile(self, pidfile_id):421 """422 Indicate that the DroneManager should look for the given pidfile when423 refreshing.424 """425 if pidfile_id not in self._registered_pidfile_info:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!