Best Python code snippet using autotest_python
rpc_utils.py
Source:rpc_utils.py
...157 in models.Label.objects.filter(test__in=test_objects))158 cf_info = dict(is_server=is_server, synch_count=synch_count,159 dependencies=list(dependencies))160 return cf_info, test_objects, profiler_objects, label161def check_job_dependencies(host_objects, job_dependencies):162 """163 Check that a set of machines satisfies a job's dependencies.164 host_objects: list of models.Host objects165 job_dependencies: list of names of labels166 """167 # check that hosts satisfy dependencies168 host_ids = [host.id for host in host_objects]169 hosts_in_job = models.Host.objects.filter(id__in=host_ids)170 ok_hosts = hosts_in_job171 for index, dependency in enumerate(job_dependencies):172 ok_hosts &= models.Host.objects.filter_custom_join(173 '_label%d' % index, labels__name=dependency)174 failing_hosts = (set(host.hostname for host in host_objects) -175 set(host.hostname for host in ok_hosts))176 if failing_hosts:177 raise model_logic.ValidationError(178 {'hosts' : 'Host(s) failed to meet job dependencies: ' +179 ', '.join(failing_hosts)})180def _execution_key_for(host_queue_entry):181 return (host_queue_entry.job.id, host_queue_entry.execution_subdir)182def check_abort_synchronous_jobs(host_queue_entries):183 # ensure user isn't aborting part of a synchronous autoserv execution184 count_per_execution = {}185 for queue_entry in host_queue_entries:186 key = _execution_key_for(queue_entry)187 count_per_execution.setdefault(key, 0)188 count_per_execution[key] += 1189 for queue_entry in host_queue_entries:190 if not queue_entry.execution_subdir:191 continue192 execution_count = count_per_execution[_execution_key_for(queue_entry)]193 if execution_count < queue_entry.job.synch_count:194 raise model_logic.ValidationError(195 {'' : 'You cannot abort part of a synchronous job execution '196 '(%d/%s), %d included, %d expected'197 % (queue_entry.job.id, queue_entry.execution_subdir,198 execution_count, queue_entry.job.synch_count)})199def check_atomic_group_create_job(synch_count, host_objects, metahost_objects,200 dependencies, atomic_group, labels_by_name):201 """202 Attempt to reject create_job requests with an atomic group that203 will be impossible to schedule. The checks are not perfect but204 should catch the most obvious issues.205 @param synch_count - The job's minimum synch count.206 @param host_objects - A list of models.Host instances.207 @param metahost_objects - A list of models.Label instances.208 @param dependencies - A list of job dependency label names.209 @param atomic_group - The models.AtomicGroup instance.210 @param labels_by_name - A dictionary mapping label names to models.Label211 instance. Used to look up instances for dependencies.212 @raises model_logic.ValidationError - When an issue is found.213 """214 # If specific host objects were supplied with an atomic group, verify215 # that there are enough to satisfy the synch_count.216 minimum_required = synch_count or 1217 if (host_objects and not metahost_objects and218 len(host_objects) < minimum_required):219 raise model_logic.ValidationError(220 {'hosts':221 'only %d hosts provided for job with synch_count = %d' %222 (len(host_objects), synch_count)})223 # Check that the atomic group has a hope of running this job224 # given any supplied metahosts and dependancies that may limit.225 # Get a set of hostnames in the atomic group.226 possible_hosts = set()227 for label in atomic_group.label_set.all():228 possible_hosts.update(h.hostname for h in label.host_set.all())229 # Filter out hosts that don't match all of the job dependency labels.230 for label_name in set(dependencies):231 label = labels_by_name[label_name]232 hosts_in_label = (h.hostname for h in label.host_set.all())233 possible_hosts.intersection_update(hosts_in_label)234 if not host_objects and not metahost_objects:235 # No hosts or metahosts are required to queue an atomic group Job.236 # However, if they are given, we respect them below.237 host_set = possible_hosts238 else:239 host_set = set(host.hostname for host in host_objects)240 unusable_host_set = host_set.difference(possible_hosts)241 if unusable_host_set:242 raise model_logic.ValidationError(243 {'hosts': 'Hosts "%s" are not in Atomic Group "%s"' %244 (', '.join(sorted(unusable_host_set)), atomic_group.name)})245 # Lookup hosts provided by each meta host and merge them into the246 # host_set for final counting.247 for meta_host in metahost_objects:248 meta_possible = possible_hosts.copy()249 hosts_in_meta_host = (h.hostname for h in meta_host.host_set.all())250 meta_possible.intersection_update(hosts_in_meta_host)251 # Count all hosts that this meta_host will provide.252 host_set.update(meta_possible)253 if len(host_set) < minimum_required:254 raise model_logic.ValidationError(255 {'atomic_group_name':256 'Insufficient hosts in Atomic Group "%s" with the'257 ' supplied dependencies and meta_hosts.' %258 (atomic_group.name,)})259def get_motd():260 dirname = os.path.dirname(__file__)261 filename = os.path.join(dirname, "..", "..", "motd.txt")262 text = ''263 try:264 fp = open(filename, "r")265 try:266 text = fp.read()267 finally:268 fp.close()269 except:270 pass271 return text272def _get_metahost_counts(metahost_objects):273 metahost_counts = {}274 for metahost in metahost_objects:275 metahost_counts.setdefault(metahost, 0)276 metahost_counts[metahost] += 1277 return metahost_counts278def get_job_info(job, preserve_metahosts=False, queue_entry_filter_data=None):279 hosts = []280 one_time_hosts = []281 meta_hosts = []282 atomic_group = None283 queue_entries = job.hostqueueentry_set.all()284 if queue_entry_filter_data:285 queue_entries = models.HostQueueEntry.query_objects(286 queue_entry_filter_data, initial_query=queue_entries)287 for queue_entry in queue_entries:288 if (queue_entry.host and (preserve_metahosts or289 not queue_entry.meta_host)):290 if queue_entry.deleted:291 continue292 if queue_entry.host.invalid:293 one_time_hosts.append(queue_entry.host)294 else:295 hosts.append(queue_entry.host)296 else:297 meta_hosts.append(queue_entry.meta_host)298 if atomic_group is None:299 if queue_entry.atomic_group is not None:300 atomic_group = queue_entry.atomic_group301 else:302 assert atomic_group.name == queue_entry.atomic_group.name, (303 'DB inconsistency. HostQueueEntries with multiple atomic'304 ' groups on job %s: %s != %s' % (305 id, atomic_group.name, queue_entry.atomic_group.name))306 meta_host_counts = _get_metahost_counts(meta_hosts)307 info = dict(dependencies=[label.name for label308 in job.dependency_labels.all()],309 hosts=hosts,310 meta_hosts=meta_hosts,311 meta_host_counts=meta_host_counts,312 one_time_hosts=one_time_hosts,313 atomic_group=atomic_group)314 return info315def create_new_job(owner, options, host_objects, metahost_objects,316 atomic_group=None):317 labels_by_name = dict((label.name, label)318 for label in models.Label.objects.all())319 all_host_objects = host_objects + metahost_objects320 metahost_counts = _get_metahost_counts(metahost_objects)321 dependencies = options.get('dependencies', [])322 synch_count = options.get('synch_count')323 # check that each metahost request has enough hosts under the label324 for label, requested_count in metahost_counts.iteritems():325 available_count = label.host_set.count()326 if requested_count > available_count:327 error = ("You have requested %d %s's, but there are only %d."328 % (requested_count, label.name, available_count))329 raise model_logic.ValidationError({'meta_hosts' : error})330 if atomic_group:331 check_atomic_group_create_job(332 synch_count, host_objects, metahost_objects,333 dependencies, atomic_group, labels_by_name)334 else:335 if synch_count is not None and synch_count > len(all_host_objects):336 raise model_logic.ValidationError(337 {'hosts':338 'only %d hosts provided for job with synch_count = %d' %339 (len(all_host_objects), synch_count)})340 atomic_hosts = models.Host.objects.filter(341 id__in=[host.id for host in host_objects],342 labels__atomic_group=True)343 unusable_host_names = [host.hostname for host in atomic_hosts]344 if unusable_host_names:345 raise model_logic.ValidationError(346 {'hosts':347 'Host(s) "%s" are atomic group hosts but no '348 'atomic group was specified for this job.' %349 (', '.join(unusable_host_names),)})350 check_job_dependencies(host_objects, dependencies)351 options['dependencies'] = [labels_by_name[label_name]352 for label_name in dependencies]353 for label in metahost_objects + options['dependencies']:354 if label.atomic_group and not atomic_group:355 raise model_logic.ValidationError(356 {'atomic_group_name':357 'Dependency %r requires an atomic group but no '358 'atomic_group_name or meta_host in an atomic group was '359 'specified for this job.' % label.name})360 elif (label.atomic_group and361 label.atomic_group.name != atomic_group.name):362 raise model_logic.ValidationError(363 {'atomic_group_name':364 'meta_hosts or dependency %r requires atomic group '...
test_function_bag.py
Source:test_function_bag.py
...16 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'])17 sess.add_all([sub, job])18 sess.commit()19 with pytest.raises(ValueError):20 check_job_dependencies(job.job_id)21@pytest.mark.usefixtures("job_constants")22def test_check_job_dependencies_has_unfinished_dependencies(database):23 """ Tests check_job_dependencies with a job that isn't finished """24 sess = database.session25 sub = SubmissionFactory(submission_id=1)26 job = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['finished'],27 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'],28 number_of_errors=0)29 job_2 = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['waiting'],30 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'])31 job_3 = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['waiting'],32 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'],33 number_of_errors=0)34 sess.add_all([sub, job, job_2, job_3])35 sess.commit()36 # Job 1 finished, it is a prerequisite for job 2 (waiting)37 job_dep = JobDependency(job_id=job_2.job_id, prerequisite_id=job.job_id)38 # Job 3 is also a prerequisite of job 2, it's not done, job 2 should stay in "waiting"39 job_dep_2 = JobDependency(job_id=job_2.job_id, prerequisite_id=job_3.job_id)40 sess.add_all([job_dep, job_dep_2])41 sess.commit()42 check_job_dependencies(job.job_id)43 assert job_2.job_status_id == JOB_STATUS_DICT['waiting']44@pytest.mark.usefixtures("job_constants")45def test_check_job_dependencies_prior_dependency_has_errors(database):46 """ Tests check_job_dependencies with a job that is finished but has errors """47 sess = database.session48 sub = SubmissionFactory(submission_id=1)49 job = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['finished'],50 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'],51 number_of_errors=3)52 job_2 = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['waiting'],53 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'])54 sess.add_all([sub, job, job_2])55 sess.commit()56 # Job 1 finished, it is a prerequisite for job 2 (waiting) but it has errors57 job_dep = JobDependency(job_id=job_2.job_id, prerequisite_id=job.job_id)58 sess.add(job_dep)59 sess.commit()60 check_job_dependencies(job.job_id)61 assert job_2.job_status_id == JOB_STATUS_DICT['waiting']62@patch('dataactcore.interfaces.function_bag.sqs_queue')63@pytest.mark.usefixtures("job_constants")64def test_check_job_dependencies_ready(mock_sqs_queue, database):65 """ Tests check_job_dependencies with a job that can be set to ready """66 # Mock so it always returns the mock queue for the test67 mock_sqs_queue.return_value = SQSMockQueue68 sess = database.session69 sub = SubmissionFactory(submission_id=1)70 job = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['finished'],71 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'],72 number_of_errors=0)73 job_2 = JobFactory(submission_id=sub.submission_id, job_status_id=JOB_STATUS_DICT['waiting'],74 job_type_id=JOB_TYPE_DICT['csv_record_validation'], file_type_id=FILE_TYPE_DICT['award'])75 sess.add_all([sub, job, job_2])76 sess.commit()77 # Job 1 finished, it is a prerequisite for job 2 (waiting) but it has errors78 job_dep = JobDependency(job_id=job_2.job_id, prerequisite_id=job.job_id)79 sess.add(job_dep)80 sess.commit()81 check_job_dependencies(job.job_id)82 assert job_2.job_status_id == JOB_STATUS_DICT['ready']83def test_get_certification_deadline(database):84 """ Tests get_certification_deadline with subs """85 sess = database.session86 quart_sub = SubmissionFactory(submission_id=1, reporting_fiscal_year=2020, reporting_fiscal_period=6,87 is_fabs=False, is_quarter_format=True)88 month_sub = SubmissionFactory(submission_id=2, reporting_fiscal_year=2020, reporting_fiscal_period=10,89 is_fabs=False, is_quarter_format=False)90 fail_sub = SubmissionFactory(submission_id=3, reporting_fiscal_year=2020, reporting_fiscal_period=9,91 is_fabs=False, is_quarter_format=False)92 fabs_sub = SubmissionFactory(submission_id=4, reporting_fiscal_year=2020, reporting_fiscal_period=6,93 is_fabs=True, is_quarter_format=False)94 q2 = SubmissionWindowScheduleFactory(period=6, year=2020)95 p10 = SubmissionWindowScheduleFactory(period=10, year=2020)...
test_job_configuration.py
Source:test_job_configuration.py
...34 params = SubmitterParams(hpc_config=hpc_config)35 job = config.get_job("1")36 job.blocked_by.add("10")37 with pytest.raises(InvalidConfiguration):38 config.check_job_dependencies(params)39 # While we have this setup, verify that submit-jobs calls this function.40 config.dump(CONFIG_FILE)41 cmd = f"{SUBMIT_JOBS} {CONFIG_FILE} --output={OUTPUT} " "--poll-interval=.1 "42 ret = run_command(cmd)43 assert ret != 044def test_job_configuration__check_job_dependencies_estimate(job_fixture):45 with open(TEST_FILENAME, "w") as f_out:46 f_out.write("echo hello world\n")47 inputs = GenericCommandInputs(TEST_FILENAME)48 config = GenericCommandConfiguration(job_inputs=inputs)49 for job_param in inputs.iter_jobs():50 config.add_job(job_param)51 assert config.get_num_jobs() == 152 hpc_config = HpcConfig(**load_data(FAKE_HPC_CONFIG))53 params = SubmitterParams(hpc_config=hpc_config, per_node_batch_size=0)54 with pytest.raises(InvalidConfiguration):55 config.check_job_dependencies(params)56def test_job_configuration__shuffle_jobs(job_fixture):57 num_jobs = 1058 with open(TEST_FILENAME, "w") as f_out:59 for i in range(num_jobs):60 f_out.write("echo hello world\n")61 inputs = GenericCommandInputs(TEST_FILENAME)62 config = GenericCommandConfiguration(job_inputs=inputs)63 for job_param in inputs.iter_jobs():64 config.add_job(job_param)65 assert config.get_num_jobs() == num_jobs66 assert [x.name for x in config.iter_jobs()] == [str(x) for x in range(1, num_jobs + 1)]67 config.shuffle_jobs()68 assert [x.name for x in config.iter_jobs()] != [str(x) for x in range(1, num_jobs + 1)]69def test_job_configuration__custom_names(job_fixture):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!