Best Python code snippet using autotest_python
rpc_interface.py
Source:rpc_interface.py
...316 exclude_only_if_needed_labels,317 exclude_atomic_group_hosts,318 valid_only, filter_data)319 return hosts.count()320def get_install_server_profiles():321 """322 Get install server profiles.323 :return: Sequence of profiles.324 """325 install_server = None326 install_server_info = get_install_server_info()327 install_server_type = install_server_info.get('type', None)328 install_server_url = install_server_info.get('xmlrpc_url', None)329 if install_server_type == 'cobbler' and install_server_url:330 install_server = xmlrpclib.ServerProxy(install_server_url)331 if install_server is None:332 return None333 return install_server.get_item_names('profile')334def get_profiles():335 """336 Get profiles.337 :return: Sequence of profiles.338 """339 error_encountered = True340 profile_dicts = []341 profiles = get_install_server_profiles()342 if profiles is not None:343 if len(profiles) < 1:344 msg = 'No profiles defined on install server'345 rpc_logger = logging.getLogger('rpc_logger')346 rpc_logger.info(msg)347 else:348 error_encountered = False349 # not sorted350 profiles.sort()351 profile_dicts.append(dict(name="Do_not_install"))352 for profile in profiles:353 profile_dicts.append(dict(name=profile))354 if error_encountered:355 profile_dicts.append(dict(name="N/A"))356 return rpc_utils.prepare_for_serialization(profile_dicts)357def get_num_profiles():358 """359 Get the number of profiles. Same parameters as get_profiles().360 :return: The number of defined profiles.361 """362 error_encountered = True363 profiles = get_install_server_profiles()364 if profiles is not None:365 if len(profiles) < 1:366 # 'N/A'367 return 1368 else:369 # include 'Do_not_install'370 return len(profiles) + 1371 if error_encountered:372 # 'N/A'373 return 1374def reserve_hosts(host_filter_data, username=None):375 """376 Reserve some hosts.377 :param host_filter_data: Filters out which hosts to reserve.378 :param username: login of the user reserving hosts379 :type username: str380 :return: None.381 """382 hosts = models.Host.query_objects(host_filter_data)383 reservations.create(hosts_to_reserve=[h.hostname for h in hosts],384 username=username)385def release_hosts(host_filter_data, username=None):386 """387 Release some hosts.388 :param host_filter_data: Filters out which hosts to release.389 :param username: login of the user reserving hosts390 :type username: str391 :return: None.392 """393 hosts = models.Host.query_objects(host_filter_data)394 reservations.release(hosts_to_release=[h.hostname for h in hosts],395 username=username)396def force_release_hosts(host_filter_data, username=None):397 """398 Force release some hosts (remove all ACLs).399 :param host_filter_data: Filters out which hosts to release.400 :param username: login of the user releasing hosts, which needs have elevated privileges401 :type username: str402 :return: None.403 """404 hosts = models.Host.query_objects(host_filter_data)405 reservations.force_release(hosts_to_release=[h.hostname for h in hosts],406 username=username)407# tests408def add_test(name, test_type, path, author=None, dependencies=None,409 experimental=True, run_verify=None, test_class=None,410 test_time=None, test_category=None, description=None,411 sync_count=1):412 """413 Add (create) test.414 :param name: Test name.415 :param test_type: Test type (Client or Server).416 :param path: Relative path to the test.417 :param author: The author of the test (optional).418 :param dependencies: Dependencies (optional).419 :param experimental: Experimental? (True or False) (optional).420 :param run_verify: Run verify? (True or False) (optional).421 :param test_class: Test class (optional).422 :param test_time: Test time (optional).423 :param test_category: Test category (optional).424 :param description: Description (optional).425 :param sync_count: Sync count (optional).426 :return: ID.427 """428 return models.Test.add_object(name=name, test_type=test_type, path=path,429 author=author, dependencies=dependencies,430 experimental=experimental,431 run_verify=run_verify, test_time=test_time,432 test_category=test_category,433 sync_count=sync_count,434 test_class=test_class,435 description=description).id436def modify_test(id, **data):437 """438 Modify (update) test.439 :param id: Test identification.440 :param data: Test data to modify.441 :return: None.442 """443 models.Test.smart_get(id).update_object(data)444def delete_test(id):445 """446 Delete test.447 :param id: Test identification.448 :return: None.449 """450 models.Test.smart_get(id).delete()451def get_tests(**filter_data):452 """453 Get tests.454 :param filter_data: Filters out which tests to get.455 :return: Sequence of tests.456 """457 return rpc_utils.prepare_for_serialization(458 models.Test.list_objects(filter_data))459# profilers460def add_profiler(name, description=None):461 """462 Add (create) profiler.463 :param name: The name of the profiler.464 :param description: Description (optional).465 :return: ID.466 """467 return models.Profiler.add_object(name=name, description=description).id468def modify_profiler(id, **data):469 """470 Modify (update) profiler.471 :param id: Profiler identification.472 :param data: Profiler data to modify.473 :return: None.474 """475 models.Profiler.smart_get(id).update_object(data)476def delete_profiler(id):477 """478 Delete profiler.479 :param id: Profiler identification.480 :return: None.481 """482 models.Profiler.smart_get(id).delete()483def get_profilers(**filter_data):484 """485 Get all profilers.486 :param filter_data: Filters out which profilers to get.487 :return: Sequence of profilers.488 """489 return rpc_utils.prepare_for_serialization(490 models.Profiler.list_objects(filter_data))491# users492def add_user(login, access_level=None):493 """494 Add (create) user.495 :param login: The login name.496 :param acess_level: Access level (optional).497 :return: ID.498 """499 return models.User.add_object(login=login, access_level=access_level).id500def modify_user(id, **data):501 """502 Modify (update) user.503 :param id: User identification.504 :param data: User data to modify.505 :return: None.506 """507 models.User.smart_get(id).update_object(data)508def delete_user(id):509 """510 Delete user.511 :param id: User identification.512 :return: None.513 """514 models.User.smart_get(id).delete()515def get_users(**filter_data):516 """517 Get users.518 :param filter_data: Filters out which users to get.519 :return: Sequence of users.520 """521 return rpc_utils.prepare_for_serialization(522 models.User.list_objects(filter_data))523# acl groups524def add_acl_group(name, description=None):525 """526 Add (create) ACL group.527 :param name: The name of the ACL group.528 :param description: Description (optional).529 :return: ID.530 """531 group = models.AclGroup.add_object(name=name, description=description)532 group.users.add(models.User.current_user())533 return group.id534def modify_acl_group(id, **data):535 """536 Modify (update) ACL group.537 :param id: ACL group identification.538 :param data: ACL group data to modify.539 :return: None.540 """541 group = models.AclGroup.smart_get(id)542 group.check_for_acl_violation_acl_group()543 group.update_object(data)544 group.add_current_user_if_empty()545def acl_group_add_users(id, users):546 """547 Add users to an ACL group.548 :param id: ACL group identification.549 :param users: Sequence of users.550 :return: None.551 """552 group = models.AclGroup.smart_get(id)553 group.check_for_acl_violation_acl_group()554 users = models.User.smart_get_bulk(users)555 group.users.add(*users)556def acl_group_remove_users(id, users):557 """558 Remove users from an ACL group.559 :param id: ACL group identification.560 :param users: Sequence of users.561 :return: None.562 """563 group = models.AclGroup.smart_get(id)564 group.check_for_acl_violation_acl_group()565 users = models.User.smart_get_bulk(users)566 group.users.remove(*users)567 group.add_current_user_if_empty()568def acl_group_add_hosts(id, hosts):569 """570 Add hosts to an ACL group.571 :param id: ACL group identification.572 :param hosts: Sequence of hosts to add.573 :return: None.574 """575 group = models.AclGroup.smart_get(id)576 group.check_for_acl_violation_acl_group()577 hosts = models.Host.smart_get_bulk(hosts)578 group.hosts.add(*hosts)579 group.on_host_membership_change()580def acl_group_remove_hosts(id, hosts):581 """582 Remove hosts from an ACL group.583 :param id: ACL group identification.584 :param hosts: Sequence of hosts to remove.585 :return: None.586 """587 group = models.AclGroup.smart_get(id)588 group.check_for_acl_violation_acl_group()589 hosts = models.Host.smart_get_bulk(hosts)590 group.hosts.remove(*hosts)591 group.on_host_membership_change()592def delete_acl_group(id):593 """594 Delete ACL group.595 :param id: ACL group identification.596 :return: None.597 """598 models.AclGroup.smart_get(id).delete()599def get_acl_groups(**filter_data):600 """601 Get ACL groups.602 :param filter_data: Filters out which ACL groups to get.603 :return: Sequence of ACL groups.604 """605 acl_groups = models.AclGroup.list_objects(filter_data)606 for acl_group in acl_groups:607 acl_group_obj = models.AclGroup.objects.get(id=acl_group['id'])608 acl_group['users'] = [user.login609 for user in acl_group_obj.users.all()]610 acl_group['hosts'] = [host.hostname611 for host in acl_group_obj.hosts.all()]612 return rpc_utils.prepare_for_serialization(acl_groups)613# jobs614def generate_control_file(tests=(), kernel=None, label=None, profilers=(),615 client_control_file='', use_container=False,616 profile_only=None, upload_kernel_config=False):617 """618 Generates a client-side control file to load a kernel and run tests.619 :param tests List of tests to run.620 :param kernel A list of kernel info dictionaries configuring which kernels621 to boot for this job and other options for them622 :param label Name of label to grab kernel config from.623 :param profilers List of profilers to activate during the job.624 :param client_control_file The contents of a client-side control file to625 run at the end of all tests. If this is supplied, all tests must be626 client side.627 TODO: in the future we should support server control files directly628 to wrap with a kernel. That'll require changing the parameter629 name and adding a boolean to indicate if it is a client or server630 control file.631 :param use_container unused argument today. TODO: Enable containers632 on the host during a client side test.633 :param profile_only A boolean that indicates what default profile_only634 mode to use in the control file. Passing None will generate a635 control file that does not explcitly set the default mode at all.636 :param upload_kernel_config: if enabled it will generate server control637 file code that uploads the kernel config file to the client and638 tells the client of the new (local) path when compiling the kernel;639 the tests must be server side tests640 :return: a dict with the following keys:641 control_file: str, The control file text.642 is_server: bool, is the control file a server-side control file?643 synch_count: How many machines the job uses per autoserv execution.644 synch_count == 1 means the job is asynchronous.645 dependencies: A list of the names of labels on which the job depends.646 """647 if not tests and not client_control_file:648 return dict(control_file='', is_server=False, synch_count=1,649 dependencies=[])650 cf_info, test_objects, profiler_objects, label = (651 rpc_utils.prepare_generate_control_file(tests, kernel, label,652 profilers))653 cf_info['control_file'] = control_file.generate_control(654 tests=test_objects, kernels=kernel, platform=label,655 profilers=profiler_objects, is_server=cf_info['is_server'],656 client_control_file=client_control_file, profile_only=profile_only,657 upload_kernel_config=upload_kernel_config)658 return cf_info659def create_parameterized_job(name, priority, test, parameters, kernel=None,660 label=None, profiles=[], profilers=(),661 profiler_parameters=None,662 use_container=False, profile_only=None,663 upload_kernel_config=False, hosts=[],664 meta_hosts=[], meta_host_profiles=[], one_time_hosts=[],665 atomic_group_name=None, synch_count=None,666 is_template=False, timeout=None,667 max_runtime_hrs=None, run_verify=True,668 email_list='', dependencies=(), reboot_before=None,669 reboot_after=None, parse_failed_repair=None,670 hostless=False, keyvals=None, drone_set=None,671 reserve_hosts=False):672 """673 Creates and enqueues a parameterized job.674 Most parameters a combination of the parameters for generate_control_file()675 and create_job(), with the exception of:676 :param test name or ID of the test to run677 :param parameters a map of parameter name ->678 tuple of (param value, param type)679 :param profiler_parameters a dictionary of parameters for the profilers:680 key: profiler name681 value: dict of param name -> tuple of682 (param value,683 param type)684 """685 # Save the values of the passed arguments here. What we're going to do with686 # them is pass them all to rpc_utils.get_create_job_common_args(), which687 # will extract the subset of these arguments that apply for688 # rpc_utils.create_job_common(), which we then pass in to that function.689 args = locals()690 # Set up the parameterized job configs691 test_obj = models.Test.smart_get(test)692 if test_obj.test_type == model_attributes.TestTypes.SERVER:693 control_type = models.Job.ControlType.SERVER694 else:695 control_type = models.Job.ControlType.CLIENT696 try:697 label = models.Label.smart_get(label)698 except models.Label.DoesNotExist:699 label = None700 kernel_objs = models.Kernel.create_kernels(kernel)701 profiler_objs = [models.Profiler.smart_get(profiler)702 for profiler in profilers]703 parameterized_job = models.ParameterizedJob.objects.create(704 test=test_obj, label=label, use_container=use_container,705 profile_only=profile_only,706 upload_kernel_config=upload_kernel_config)707 parameterized_job.kernels.add(*kernel_objs)708 for profiler in profiler_objs:709 parameterized_profiler = models.ParameterizedJobProfiler.objects.create(710 parameterized_job=parameterized_job,711 profiler=profiler)712 profiler_params = profiler_parameters.get(profiler.name, {})713 for name, (value, param_type) in profiler_params.iteritems():714 models.ParameterizedJobProfilerParameter.objects.create(715 parameterized_job_profiler=parameterized_profiler,716 parameter_name=name,717 parameter_value=value,718 parameter_type=param_type)719 try:720 for parameter in test_obj.testparameter_set.all():721 if parameter.name in parameters:722 param_value, param_type = parameters.pop(parameter.name)723 parameterized_job.parameterizedjobparameter_set.create(724 test_parameter=parameter, parameter_value=param_value,725 parameter_type=param_type)726 if parameters:727 raise Exception('Extra parameters remain: %r' % parameters)728 return rpc_utils.create_job_common(729 parameterized_job=parameterized_job.id,730 control_type=control_type,731 **rpc_utils.get_create_job_common_args(args))732 except Exception:733 parameterized_job.delete()734 raise735def create_job(name, priority, control_file, control_type,736 hosts=[], profiles=[], meta_hosts=[], meta_host_profiles=[],737 one_time_hosts=[], atomic_group_name=None, synch_count=None,738 is_template=False, timeout=None, max_runtime_hrs=None,739 run_verify=True, email_list='', dependencies=(), reboot_before=None,740 reboot_after=None, parse_failed_repair=None, hostless=False,741 keyvals=None, drone_set=None, reserve_hosts=False):742 """743 Create and enqueue a job.744 :param name: name of this job745 :param priority: Low, Medium, High, Urgent746 :param control_file: String contents of the control file.747 :param control_type: Type of control file, Client or Server.748 :param synch_count: How many machines the job uses per autoserv execution.749 synch_count == 1 means the job is asynchronous. If an750 atomic group is given this value is treated as a751 minimum.752 :param is_template: If true then create a template job.753 :param timeout: Hours after this call returns until the job times out.754 :param max_runtime_hrs: Hours from job starting time until job times out755 :param run_verify: Should the host be verified before running the test?756 :param email_list: String containing emails to mail when the job is done757 :param dependencies: List of label names on which this job depends758 :param reboot_before: Never, If dirty, or Always759 :param reboot_after: Never, If all tests passed, or Always760 :param parse_failed_repair: if true, results of failed repairs launched by761 this job will be parsed as part of the job.762 :param hostless: if true, create a hostless job763 :param keyvals: dict of keyvals to associate with the job764 :param hosts: List of hosts to run job on.765 :param profiles: List of profiles to use, in sync with @hosts list766 :param meta_hosts: List where each entry is a label name, and for each767 entry one host will be chosen from that label to run768 the job on.769 :param one_time_hosts: List of hosts not in the database to run the job on.770 :param atomic_group_name: name of an atomic group to schedule the job on.771 :param drone_set: The name of the drone set to run this test on.772 :param reserve_hosts: If set we will reseve the hosts that were allocated773 for this job774 :returns: The created Job id number.775 :rtype: integer776 """777 return rpc_utils.create_job_common(778 **rpc_utils.get_create_job_common_args(locals()))779def abort_host_queue_entries(**filter_data):780 """781 Abort a set of host queue entries.782 :param filter_data: Filters out which hosts.783 :return: None.784 """785 query = models.HostQueueEntry.query_objects(filter_data)786 query = query.filter(complete=False)787 models.AclGroup.check_abort_permissions(query)788 host_queue_entries = list(query.select_related())789 rpc_utils.check_abort_synchronous_jobs(host_queue_entries)790 for queue_entry in host_queue_entries:791 queue_entry.abort()792def reverify_hosts(**filter_data):793 """794 Schedules a set of hosts for verify.795 :param filter_data: Filters out which hosts.796 :return: A list of hostnames that a verify task was created for.797 """798 hosts = models.Host.query_objects(filter_data)799 models.AclGroup.check_for_acl_violation_hosts(hosts)800 for host in hosts:801 models.SpecialTask.schedule_special_task(host,802 models.SpecialTask.Task.VERIFY)803 return list(sorted(host.hostname for host in hosts))804def get_jobs(not_yet_run=False, running=False, finished=False, **filter_data):805 """806 Extra filter args for get_jobs:807 -not_yet_run: Include only jobs that have not yet started running.808 -running: Include only jobs that have start running but for which not809 all hosts have completed.810 -finished: Include only jobs for which all hosts have completed (or811 aborted).812 At most one of these three fields should be specified.813 """814 filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,815 running,816 finished)817 job_dicts = []818 jobs = list(models.Job.query_objects(filter_data))819 models.Job.objects.populate_relationships(jobs, models.Label,820 'dependencies')821 models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')822 for job in jobs:823 job_dict = job.get_object_dict()824 job_dict['dependencies'] = ','.join(label.name825 for label in job.dependencies)826 job_dict['keyvals'] = dict((keyval.key, keyval.value)827 for keyval in job.keyvals)828 job_dicts.append(job_dict)829 return rpc_utils.prepare_for_serialization(job_dicts)830def get_num_jobs(not_yet_run=False, running=False, finished=False,831 **filter_data):832 """833 See get_jobs() for documentation of extra filter parameters.834 """835 filter_data['extra_args'] = rpc_utils.extra_job_filters(not_yet_run,836 running,837 finished)838 return models.Job.query_count(filter_data)839def get_jobs_summary(**filter_data):840 """841 Like get_jobs(), but adds a 'status_counts' field, which is a dictionary842 mapping status strings to the number of hosts currently with that843 status, i.e. {'Queued' : 4, 'Running' : 2}.844 """845 jobs = get_jobs(**filter_data)846 ids = [job['id'] for job in jobs]847 all_status_counts = models.Job.objects.get_status_counts(ids)848 for job in jobs:849 job['status_counts'] = all_status_counts[job['id']]850 return rpc_utils.prepare_for_serialization(jobs)851def get_info_for_clone(id, preserve_metahosts, queue_entry_filter_data=None):852 """853 Retrieves all the information needed to clone a job.854 """855 job = models.Job.objects.get(id=id)856 job_info = rpc_utils.get_job_info(job,857 preserve_metahosts,858 queue_entry_filter_data)859 host_dicts = []860 for host, profile in zip(job_info['hosts'], job_info['profiles']):861 host_dict = get_hosts(id=host.id)[0]862 other_labels = host_dict['labels']863 if host_dict['platform']:864 other_labels.remove(host_dict['platform'])865 host_dict['other_labels'] = ', '.join(other_labels)866 host_dict['profile'] = profile867 host_dicts.append(host_dict)868 for host in job_info['one_time_hosts']:869 host_dict = dict(hostname=host.hostname,870 id=host.id,871 platform='(one-time host)',872 locked_text='')873 host_dicts.append(host_dict)874 meta_host_dicts = []875 # convert keys from Label objects to strings (names of labels)876 meta_host_counts = dict((meta_host.name, count) for meta_host, count877 in job_info['meta_host_counts'].iteritems())878 for meta_host, meta_host_profile in zip(job_info['meta_hosts'], job_info['meta_host_profiles']):879 meta_host_dict = dict(name=meta_host.name, count=meta_host_counts[meta_host.name], profile=meta_host_profile)880 meta_host_dicts.append(meta_host_dict)881 info = dict(job=job.get_object_dict(),882 meta_hosts=meta_host_dicts,883 hosts=host_dicts)884 info['job']['dependencies'] = job_info['dependencies']885 if job_info['atomic_group']:886 info['atomic_group_name'] = (job_info['atomic_group']).name887 else:888 info['atomic_group_name'] = None889 info['hostless'] = job_info['hostless']890 info['drone_set'] = job.drone_set and job.drone_set.name891 return rpc_utils.prepare_for_serialization(info)892# host queue entries893def get_host_queue_entries(**filter_data):894 """895 :return: A sequence of nested dictionaries of host and job information.896 """897 return rpc_utils.prepare_rows_as_nested_dicts(898 models.HostQueueEntry.query_objects(filter_data),899 ('host', 'atomic_group', 'job'))900def get_num_host_queue_entries(**filter_data):901 """902 Get the number of host queue entries associated with this job.903 """904 return models.HostQueueEntry.query_count(filter_data)905def get_hqe_percentage_complete(**filter_data):906 """907 Computes the fraction of host queue entries matching the given filter data908 that are complete.909 """910 query = models.HostQueueEntry.query_objects(filter_data)911 complete_count = query.filter(complete=True).count()912 total_count = query.count()913 if total_count == 0:914 return 1915 return float(complete_count) / total_count916# special tasks917def get_special_tasks(**filter_data):918 return rpc_utils.prepare_rows_as_nested_dicts(919 models.SpecialTask.query_objects(filter_data),920 ('host', 'queue_entry'))921# support for host detail view922def get_host_queue_entries_and_special_tasks(hostname, query_start=None,923 query_limit=None):924 """925 :return: an interleaved list of HostQueueEntries and SpecialTasks,926 in approximate run order. each dict contains keys for type, host,927 job, status, started_on, execution_path, and ID.928 """929 total_limit = None930 if query_limit is not None:931 total_limit = query_start + query_limit932 filter_data = {'host__hostname': hostname,933 'query_limit': total_limit,934 'sort_by': ['-id']}935 queue_entries = list(models.HostQueueEntry.query_objects(filter_data))936 special_tasks = list(models.SpecialTask.query_objects(filter_data))937 interleaved_entries = rpc_utils.interleave_entries(queue_entries,938 special_tasks)939 if query_start is not None:940 interleaved_entries = interleaved_entries[query_start:]941 if query_limit is not None:942 interleaved_entries = interleaved_entries[:query_limit]943 return rpc_utils.prepare_for_serialization(interleaved_entries)944def get_num_host_queue_entries_and_special_tasks(hostname):945 filter_data = {'host__hostname': hostname}946 return (models.HostQueueEntry.query_count(filter_data) +947 models.SpecialTask.query_count(filter_data))948# recurring run949def get_recurring(**filter_data):950 """951 Return recurring jobs.952 :param filter_data: Filters out which recurring jobs to get.953 :return: Sequence of recurring jobs.954 """955 return rpc_utils.prepare_rows_as_nested_dicts(956 models.RecurringRun.query_objects(filter_data),957 ('job', 'owner'))958def get_num_recurring(**filter_data):959 """960 Get the number of recurring jobs.961 :param filter_data: Filters out which recurring jobs to get.962 :return: Number of recurring jobs.963 """964 return models.RecurringRun.query_count(filter_data)965def delete_recurring_runs(**filter_data):966 """967 Delete recurring jobs.968 :param filter_data: Filters out which recurring jobs to delete.969 :return: None.970 """971 to_delete = models.RecurringRun.query_objects(filter_data)972 to_delete.delete()973def create_recurring_run(job_id, start_date, loop_period, loop_count):974 """975 Create (add) a recurring job.976 :param job_id: Job identification.977 :param start_date: Start date.978 :param loop_period: Loop period.979 :param loop_count: Loo counter.980 :return: None.981 """982 owner = models.User.current_user().login983 job = models.Job.objects.get(id=job_id)984 return job.create_recurring_job(start_date=start_date,985 loop_period=loop_period,986 loop_count=loop_count,987 owner=owner)988# other989def echo(data=""):990 """991 Echo - for doing a basic test to see if RPC calls992 can successfully be made.993 :param data: Object to echo, it must be serializable.994 :return: Object echoed back.995 """996 return data997def get_motd():998 """999 Returns the message of the day (MOTD).1000 :return: String with MOTD.1001 """1002 return rpc_utils.get_motd()1003def get_static_data():1004 """1005 Returns a dictionary containing a bunch of data that shouldn't change1006 often and is otherwise inaccessible. This includes:1007 priorities: List of job priority choices.1008 default_priority: Default priority value for new jobs.1009 users: Sorted list of all users.1010 labels: Sorted list of all labels.1011 atomic_groups: Sorted list of all atomic groups.1012 tests: Sorted list of all tests.1013 profilers: Sorted list of all profilers.1014 current_user: Logged-in username.1015 host_statuses: Sorted list of possible Host statuses.1016 job_statuses: Sorted list of possible HostQueueEntry statuses.1017 job_timeout_default: The default job timeout length in hours.1018 parse_failed_repair_default: Default value for the parse_failed_repair job1019 option.1020 reboot_before_options: A list of valid RebootBefore string enums.1021 reboot_after_options: A list of valid RebootAfter string enums.1022 motd: Server's message of the day.1023 status_dictionary: A mapping from one word job status names to a more1024 informative description.1025 """1026 job_fields = models.Job.get_field_dict()1027 default_drone_set_name = models.DroneSet.default_drone_set_name()1028 drone_sets = ([default_drone_set_name] +1029 sorted(drone_set.name for drone_set in1030 models.DroneSet.objects.exclude(1031 name=default_drone_set_name)))1032 result = {}1033 result['priorities'] = models.Job.Priority.choices()1034 default_priority = job_fields['priority'].default1035 default_string = models.Job.Priority.get_string(default_priority)1036 result['default_priority'] = default_string1037 result['users'] = get_users(sort_by=['login'])1038 result['labels'] = get_labels(sort_by=['-platform', 'name'])1039 result['atomic_groups'] = get_atomic_groups(sort_by=['name'])1040 result['tests'] = get_tests(sort_by=['name'])1041 result['profilers'] = get_profilers(sort_by=['name'])1042 result['current_user'] = rpc_utils.prepare_for_serialization(1043 models.User.current_user().get_object_dict())1044 result['host_statuses'] = sorted(models.Host.Status.names)1045 result['job_statuses'] = sorted(models.HostQueueEntry.Status.names)1046 result['job_timeout_default'] = models.Job.DEFAULT_TIMEOUT1047 result['job_max_runtime_hrs_default'] = models.Job.DEFAULT_MAX_RUNTIME_HRS1048 result['parse_failed_repair_default'] = bool(1049 models.Job.DEFAULT_PARSE_FAILED_REPAIR)1050 result['reboot_before_options'] = model_attributes.RebootBefore.names1051 result['reboot_after_options'] = model_attributes.RebootAfter.names1052 result['motd'] = rpc_utils.get_motd()1053 result['drone_sets_enabled'] = models.DroneSet.drone_sets_enabled()1054 result['drone_sets'] = drone_sets1055 result['parameterized_jobs'] = models.Job.parameterized_jobs_enabled()1056 result['status_dictionary'] = {"Aborted": "Aborted",1057 "Verifying": "Verifying Host",1058 "Pending": "Waiting on other hosts",1059 "Running": "Running autoserv",1060 "Completed": "Autoserv completed",1061 "Failed": "Failed to complete",1062 "Queued": "Queued",1063 "Starting": "Next in host's queue",1064 "Stopped": "Other host(s) failed verify",1065 "Parsing": "Awaiting parse of final results",1066 "Gathering": "Gathering log files",1067 "Template": "Template job for recurring run",1068 "Waiting": "Waiting for scheduler action",1069 "Archiving": "Archiving results"}1070 return result1071def get_server_time():1072 """1073 Return server current time.1074 :return: Date string in format YYYY-MM-DD HH:MM1075 """1076 return datetime.datetime.now().strftime("%Y-%m-%d %H:%M")1077def get_version():1078 """1079 Return autotest version.1080 :return: String with version.1081 """1082 return version.get_version()1083def get_interface_version():1084 """1085 Return interface version.1086 :return: Sequence with year, month number, day.1087 """1088 return INTERFACE_VERSION1089def _get_logs_used_space():1090 """1091 (Internal) Return disk usage (percentage) for the results directory.1092 :return: Usage in percents (integer value).1093 """1094 logs_dir = settings.get_value('COMMON', 'test_output_dir', default=None)1095 autodir = os.path.abspath(os.path.join(os.path.dirname(__file__),1096 '..', '..'))1097 if logs_dir is None:1098 logs_dir = os.path.join(autodir, 'results')1099 usage = psutil.disk_usage(logs_dir)1100 return int(usage.percent)1101def _process_running(process_name):1102 """1103 (Internal) Return whether a given process name is running.1104 :param process_name: The name of the process.1105 :return: True (running) or False (no).1106 """1107 process_running = False1108 for p in psutil.process_iter():1109 for args in p.cmdline:1110 if os.path.basename(args) == process_name and p.is_running:1111 process_running = True1112 return process_running1113def get_server_status():1114 """1115 Get autotest server system information.1116 :return: Dict with keys:1117 * 'disk_space_percentage' Autotest log directory disk usage1118 * 'scheduler_running' Whether the autotest scheduler is running1119 * 'sheduler_watcher_running' Whether the scheduler watcher is1120 running1121 * 'concerns' Global evaluation of whether there are problems to1122 be addressed1123 """1124 server_status = {}1125 concerns = False1126 disk_treshold = int(settings.get_value('SERVER', 'logs_disk_usage_treshold',1127 default="80"))1128 used_space_logs = _get_logs_used_space()1129 if used_space_logs > disk_treshold:1130 concerns = True1131 server_status['used_space_logs'] = used_space_logs1132 scheduler_running = _process_running('autotest-scheduler')1133 if not scheduler_running:1134 concerns = True1135 server_status['scheduler_running'] = scheduler_running1136 watcher_running = _process_running('autotest-scheduler-watcher')1137 if not watcher_running:1138 concerns = True1139 server_status['scheduler_watcher_running'] = watcher_running1140 if settings.get_value('INSTALL_SERVER', 'xmlrpc_url', default=''):1141 install_server_running = get_install_server_profiles() is not None1142 if not install_server_running:1143 concerns = True1144 else:1145 install_server_running = False1146 server_status['install_server_running'] = install_server_running1147 server_status['concerns'] = concerns...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!