Best Python code snippet using stestr_python
test_processor.py
Source:test_processor.py
...222 run_proc.stdin.close()223 return [run_proc]224 # If there is a worker path, use that to get worker groups225 elif self.worker_path:226 test_id_groups = scheduler.generate_worker_partitions(227 test_ids, self.worker_path, self.repository,228 self._group_callback, self.randomize)229 # If we have multiple workers partition the tests and recursively230 # create single worker TestProcessorFixtures for each worker231 else:232 test_id_groups = scheduler.partition_tests(test_ids,233 self.concurrency,234 self.repository,235 self._group_callback)236 for test_ids in test_id_groups:237 if not test_ids:238 # No tests in this partition239 continue240 fixture = self.useFixture(...
test_scheduler.py
Source:test_scheduler.py
...104 self.assertTrue('TestCase4.test' in partitions[1])105 if 'testdir.testfile.TestCase5.test' not in partitions[0]:106 self.assertTrue('testdir.testfile.TestCase5.test' in partitions[1])107 @mock.patch('builtins.open', mock.mock_open(), create=True)108 def test_generate_worker_partitions(self):109 test_ids = ['test_a', 'test_b', 'your_test']110 fake_worker_yaml = [111 {'worker': ['test_']},112 {'worker': ['test']},113 ]114 with mock.patch('yaml.safe_load', return_value=fake_worker_yaml):115 groups = scheduler.generate_worker_partitions(test_ids, 'fakepath')116 expected_grouping = [117 ['test_a', 'test_b'],118 ['test_a', 'test_b', 'your_test'],119 ]120 self.assertEqual(expected_grouping, groups)121 @mock.patch('builtins.open', mock.mock_open(), create=True)122 def test_generate_worker_partitions_group_without_list(self):123 test_ids = ['test_a', 'test_b', 'your_test']124 fake_worker_yaml = [125 {'worker': ['test_']},126 {'worker': 'test'},127 ]128 with mock.patch('yaml.safe_load', return_value=fake_worker_yaml):129 self.assertRaises(TypeError, scheduler.generate_worker_partitions,130 test_ids, 'fakepath')131 @mock.patch('builtins.open', mock.mock_open(), create=True)132 def test_generate_worker_partitions_no_worker_tag(self):133 test_ids = ['test_a', 'test_b', 'your_test']134 fake_worker_yaml = [135 {'worker-foo': ['test_']},136 {'worker': ['test']},137 ]138 with mock.patch('yaml.safe_load', return_value=fake_worker_yaml):139 self.assertRaises(TypeError, scheduler.generate_worker_partitions,140 test_ids, 'fakepath')141 @mock.patch('builtins.open', mock.mock_open(), create=True)142 def test_generate_worker_partitions_group_without_match(self):143 test_ids = ['test_a', 'test_b', 'your_test']144 fake_worker_yaml = [145 {'worker': ['test_']},146 {'worker': ['test']},147 {'worker': ['foo']}148 ]149 with mock.patch('yaml.safe_load', return_value=fake_worker_yaml):150 groups = scheduler.generate_worker_partitions(test_ids, 'fakepath')151 expected_grouping = [152 ['test_a', 'test_b'],153 ['test_a', 'test_b', 'your_test'],154 ]155 self.assertEqual(expected_grouping, groups)156 @mock.patch('builtins.open', mock.mock_open(), create=True)157 def test_generate_worker_partitions_with_count(self):158 test_ids = ['test_a', 'test_b', 'your_test', 'a_thing1', 'a_thing2']159 fake_worker_yaml = [160 {'worker': ['test_']},161 {'worker': ['test']},162 {'worker': ['a_thing'], 'concurrency': 2},163 ]164 with mock.patch('yaml.safe_load', return_value=fake_worker_yaml):165 groups = scheduler.generate_worker_partitions(test_ids, 'fakepath')166 expected_grouping = [167 ['test_a', 'test_b'],168 ['test_a', 'test_b', 'your_test'],169 ['a_thing1'],170 ['a_thing2'],171 ]172 for worker in expected_grouping:173 self.assertIn(worker, groups)174 @mock.patch('builtins.open', mock.mock_open(), create=True)175 def test_generate_worker_partitions_with_count_1(self):176 test_ids = ['test_a', 'test_b', 'your_test']177 fake_worker_yaml = [178 {'worker': ['test_']},179 {'worker': ['test'], 'count': 1},180 ]181 with mock.patch('yaml.safe_load', return_value=fake_worker_yaml):182 groups = scheduler.generate_worker_partitions(test_ids, 'fakepath')183 expected_grouping = [184 ['test_a', 'test_b'],185 ['test_a', 'test_b', 'your_test'],186 ]...
scheduler.py
Source:scheduler.py
...117 return multiprocessing.cpu_count()118 except NotImplementedError:119 # No concurrency logic known.120 return None121def generate_worker_partitions(ids, worker_path, repository=None,122 group_callback=None, randomize=False):123 """Parse a worker yaml file and generate test groups124 :param list ids: A list of test ids too be partitioned125 :param path worker_path: The path to a worker file126 :param repository: A repository object that will be used for looking up127 timing data. This is optional, and also will only be used for128 scheduling if there is a count field on a worker.129 :param group_callback: A callback function that is used as a scheduler130 hint to group test_ids together and treat them as a single unit for131 scheduling. This function expects a single test_id parameter and it132 will return a group identifier. Tests_ids that have the same group133 identifier will be kept on the same worker. This is optional and134 also will only be used for scheduling if there is a count field on a135 worker....
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!