Best Python code snippet using localstack_python
test_dataset_eval.py
Source:test_dataset_eval.py
...69 dataset_eval.validate_dataset_contents(self.test_data)70 self.assertEqual(str(e.exception), "Can't find low-level data for recording: 0dad432b-16cc-4bf0-8961-fd31d124b01b")71 def test_create_job_nonormalize(self):72 # No dataset normalization73 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, False, dataset_eval.EVAL_LOCAL,74 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],75 filter_type=None)76 job = dataset_eval.get_job(job_id)77 self.assertIsNotNone(job)78 self.assertEqual(job["status"], dataset_eval.STATUS_PENDING)79 self.assertEqual(job["options"]["normalize"], False)80 def test_create_job_normalize(self):81 # dataset normalization82 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,83 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],84 filter_type=None)85 job = dataset_eval.get_job(job_id)86 self.assertIsNotNone(job)87 self.assertEqual(job["status"], dataset_eval.STATUS_PENDING)88 self.assertEqual(job["options"]["normalize"], True)89 def test_create_job_artistfilter(self):90 # Artist filtering as an option91 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, False, dataset_eval.EVAL_LOCAL,92 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],93 filter_type=dataset_eval.FILTER_ARTIST)94 job = dataset_eval.get_job(job_id)95 self.assertIsNotNone(job)96 self.assertEqual(job["status"], dataset_eval.STATUS_PENDING)97 self.assertEqual(job["options"]["filter_type"], "artist")98 def test_create_job_svm_params(self):99 # C, gamma, and preprocessing values100 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,101 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],102 filter_type=dataset_eval.FILTER_ARTIST)103 job = dataset_eval.get_job(job_id)104 self.assertIsNotNone(job)105 self.assertEqual(job["status"], dataset_eval.STATUS_PENDING)106 self.assertEqual(job["options"]["c_values"], [1, 2, 3])107 self.assertEqual(job["options"]["gamma_values"], [4, 5, 6])108 self.assertEqual(job["options"]["preprocessing_values"], ["basic"])109 def test_create_job_badfilter(self):110 # An unknown filter type111 with self.assertRaises(ValueError):112 dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,113 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],114 filter_type="test")115 def test_create_job_badlocation(self):116 # an invalid eval_location117 with self.assertRaises(ValueError):118 dataset_eval._create_job(self.conn, self.test_dataset_id, True, "not_a_location",119 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],120 filter_type=None)121 def test_job_exists(self):122 self.assertFalse(dataset_eval.job_exists(self.test_dataset_id))123 dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,124 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],125 filter_type=None)126 self.assertTrue(dataset_eval.job_exists(self.test_dataset_id))127 def test_get_job(self):128 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,129 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],130 filter_type=None)131 random_id = "f47ac10b-58cc-4372-a567-0e02b2c3d479"132 # just in case133 self.assertNotEqual(random_id, job_id)134 self.assertIsNone(dataset_eval.get_job(random_id))135 def test_set_job_result(self):136 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,137 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],138 filter_type=None)139 result = {140 u"accuracy": 1,141 u"parameters": {},142 u"confusion_matrix": {},143 }144 dataset_eval.set_job_result(145 job_id=job_id,146 result=json.dumps(result),147 )148 job = dataset_eval.get_job(job_id)149 self.assertEqual(job["result"], result)150 def test_set_job_status(self):151 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,152 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],153 filter_type=None)154 job = dataset_eval.get_job(job_id)155 self.assertEqual(job["status"], dataset_eval.STATUS_PENDING)156 dataset_eval.set_job_status(157 job_id=job_id,158 status=dataset_eval.STATUS_FAILED,159 )160 job = dataset_eval.get_job(job_id)161 self.assertEqual(job["status"], dataset_eval.STATUS_FAILED)162 def test_get_next_pending_job(self):163 job1_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,164 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],165 filter_type=None)166 job1 = dataset_eval.get_job(job1_id)167 job2_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,168 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],169 filter_type=None)170 job2 = dataset_eval.get_job(job2_id)171 next_pending = dataset_eval.get_next_pending_job()172 self.assertEqual(job1, next_pending)173 dataset_eval.set_job_status(174 job_id=job1_id,175 status=dataset_eval.STATUS_FAILED,176 )177 next_pending = dataset_eval.get_next_pending_job()178 self.assertEqual(job2, next_pending)179 def test_get_next_pending_job_remote(self):180 # If we have a remote pending job with the most recent timestamp, skip it181 job1_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_REMOTE,182 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],183 filter_type=None)184 job1 = dataset_eval.get_job(job1_id)185 job2_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,186 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],187 filter_type=None)188 job2 = dataset_eval.get_job(job2_id)189 next_pending = dataset_eval.get_next_pending_job()190 self.assertEqual(job2, next_pending)191 def test_delete_job(self):192 with self.assertRaises(dataset_eval.JobNotFoundException):193 dataset_eval.delete_job(self.test_uuid)194 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,195 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],196 filter_type=None)197 snapshots = dataset.get_snapshots_for_dataset(self.test_dataset_id)198 self.assertEqual(len(snapshots), 1)199 self.assertIsNotNone(dataset_eval.get_job(job_id))200 dataset_eval.delete_job(job_id)201 snapshots = dataset.get_snapshots_for_dataset(self.test_dataset_id)202 self.assertEqual(len(snapshots), 0)203 self.assertIsNone(dataset_eval.get_job(job_id))204 def test_eval_job_location(self):205 job1_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_REMOTE,206 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],207 filter_type=None)208 job1 = dataset_eval.get_job(job1_id)209 self.assertEqual(job1["eval_location"], dataset_eval.EVAL_REMOTE)210 job2_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,211 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],212 filter_type=None)213 job2 = dataset_eval.get_job(job2_id)214 self.assertEqual(job2["eval_location"], dataset_eval.EVAL_LOCAL)215 def test_get_remote_pending_jobs_for_user(self):216 """ Check that we get remote pending jobs for a user """217 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_REMOTE,218 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],219 filter_type=None)220 job_details = db.dataset_eval.get_job(job_id)221 response = dataset_eval.get_remote_pending_jobs_for_user(self.test_user_id)222 expected_response = [{223 'job_id' : str(job_id),224 'dataset_name' : self.test_data['name'],225 'created' : job_details['created']226 }]227 self.assertEqual(response, expected_response)228 def test_get_pending_jobs_for_user_local(self):229 """ Check that a local eval dataset for this user doesn't show """230 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_LOCAL,231 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],232 filter_type=None)233 job_details = db.dataset_eval.get_job(job_id)234 response = dataset_eval.get_remote_pending_jobs_for_user(self.test_user_id)235 self.assertEqual(response, [])236 def test_get_pending_jobs_for_user_other_user(self):237 """ Check that a remote eval job for another user doesn't show """238 another_user_id = user.create("another user")239 another_dataset_id = dataset.create_from_dict(self.test_data, author_id=another_user_id)240 job_id = dataset_eval._create_job(self.conn, another_dataset_id, True, dataset_eval.EVAL_REMOTE,241 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],242 filter_type=None)243 response = dataset_eval.get_remote_pending_jobs_for_user(self.test_user_id)244 self.assertEqual(response, [])245 def test_get_pending_jobs_for_user_done(self):246 """ Check that a remote eval job with a done status doesn't show """247 job_id = dataset_eval._create_job(self.conn, self.test_dataset_id, True, dataset_eval.EVAL_REMOTE,248 c_value=[1, 2, 3], gamma_value=[4, 5, 6], preprocessing_values=["basic"],249 filter_type=None)250 db.dataset_eval.set_job_status(job_id, db.dataset_eval.STATUS_DONE)251 response = dataset_eval.get_remote_pending_jobs_for_user(self.test_user_id)252 self.assertEqual(response, [])253 @mock.patch('db.dataset_eval.validate_dataset_contents')254 def test_evaluate_dataset(self, validate_dataset_contents):255 """Test that a dataset can be submitted for evaluation if it is complete"""256 dataset_eval.evaluate_dataset(self.test_dataset_id, False, 'local')257 self.assertTrue(dataset_eval.job_exists(self.test_dataset_id))258 # Evaluate a second time it will raise259 with self.assertRaises(dataset_eval.JobExistsException):260 dataset_eval.evaluate_dataset(self.test_dataset_id, False, 'local')261 # We validate after checking if the dataset exists, so this will only...
test_jobs.py
Source:test_jobs.py
...36 cls.job = {37 'job_type': 'Pig',38 'mains': [job_binary_id]39 }40 def _create_job(self, job_name=None):41 """Creates Job with optional name specified.42 It creates job and ensures job name. Returns id and name of created43 job.44 """45 if not job_name:46 # generate random name if it's not specified47 job_name = data_utils.rand_name('sahara-job')48 # create job49 resp_body = self.create_job(job_name, **self.job)50 # ensure that job created successfully51 self.assertEqual(job_name, resp_body['name'])52 return resp_body['id'], job_name53 @test.attr(type='smoke')54 def test_job_create(self):55 self._create_job()56 @test.attr(type='smoke')57 def test_job_list(self):58 job_info = self._create_job()59 # check for job in list60 _, jobs = self.client.list_jobs()61 jobs_info = [(job['id'], job['name']) for job in jobs]62 self.assertIn(job_info, jobs_info)63 @test.attr(type='smoke')64 def test_job_get(self):65 job_id, job_name = self._create_job()66 # check job fetch by id67 _, job = self.client.get_job(job_id)68 self.assertEqual(job_name, job['name'])69 @test.attr(type='smoke')70 def test_job_delete(self):71 job_id, _ = self._create_job()72 # delete the job by id...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!