Best Python code snippet using lemoncheesecake
test_workflow_geoprocessing_director.py
Source:test_workflow_geoprocessing_director.py
...28 super(WorkflowGeoprocessingDirectorTests, self).doCleanups()29 dependencies.clear()30 # ________________________________________________ run tests _____________________________________________________ #31 def test_run(self):32 self._director._initialize_result().AndReturn(None)33 self._director._update_task_start().AndReturn(None)34 self._director._find_all_trade_areas_that_need_gp().AndReturn(None)35 self._director._spawn_geoprocessing_workers().AndReturn(None)36 self._director._set_result().AndReturn(None)37 self._director._update_task_end().AndReturn(None)38 self.mox.ReplayAll()39 result = GeoprocessingDirector.run(self._director)40 self.assertEqual(result, {'worker_task_ids': {}})41 def test_initialize_result(self):42 GeoprocessingDirector._initialize_result(self._director)43 self.assertEqual(self._director.result['worker_task_ids'], {'do_the_hustle': [], 'oh_yeah': []})44 def test_set_result__success(self):45 start_time = datetime.datetime.utcnow()46 self._director._status = 'success'47 self._director._start_time = start_time48 end_time = datetime.datetime.utcnow()49 self.mox.StubOutWithMock(datetime, 'datetime')50 datetime.datetime.utcnow().AndReturn(end_time)51 self.mox.ReplayAll()52 GeoprocessingDirector._set_result(self._director)53 expected_result = {54 'start_time': self._director._start_time,55 'duration_seconds': (end_time - start_time).total_seconds(),56 'worker_task_ids': {}57 }58 self.assertEqual(expected_result, self._director.result)59 def test_set_result__failed(self):60 start_time = datetime.datetime.utcnow()61 self._director._status = 'failed'62 self._director._start_time = start_time63 self._director._exception = 'There is a dolphin in your computer'64 end_time = datetime.datetime.utcnow()65 self.mox.StubOutWithMock(datetime, 'datetime')66 datetime.datetime.utcnow().AndReturn(end_time)67 self.mox.ReplayAll()68 GeoprocessingDirector._set_result(self._director)69 expected_result = {70 'start_time': self._director._start_time,71 'duration_seconds': (end_time - start_time).total_seconds(),72 'worker_task_ids': {},73 'exception': 'There is a dolphin in your computer'74 }75 self.assertEqual(expected_result, self._director.result)76 # __________________________________________ spawn worker tests __________________________________________________#77 def test_find_all_trade_areas_that_need_gp(self):78 expected_query = {79 '$or': [80 {81 'data.geoprocessing.needs_gp.do_the_hustle': True,82 'data.geoprocessing.latest_attempt.do_the_hustle.result': {'$ne': 'in_progress'}83 },84 {85 'data.geoprocessing.needs_gp.oh_yeah': True,86 'data.geoprocessing.latest_attempt.oh_yeah.result': {'$ne': 'in_progress'}87 }88 ]89 }90 expected_sort = [["data.geoprocessing.latest_attempt.find_competition.start_timestamp", 1]]91 self._director._main_params.mds.create_params(resource = 'find_entities_raw',92 entity_fields = ['_id', 'data'],93 query = expected_query,94 sort = expected_sort,95 # throttled for now96 limit = 20000).AndReturn({'params': 'params'})97 self._director._main_access.mds.call_find_entities_raw('trade_area', 'params').AndReturn('bob')98 self.mox.ReplayAll()99 GeoprocessingDirector._find_all_trade_areas_that_need_gp(self._director)100 self.assertEqual(self._director._trade_areas, 'bob')101 def test_spawn_geoprocessing_workers(self):102 ta_1 = {103 'data': {104 'geoprocessing': {105 'needs_gp': {106 'do_the_hustle': True,107 'oh_yeah': True108 }109 },110 'trade_area_threshold': 'chicken'111 },112 '_id': 1,113 }114 ta_2 = {115 'data': {116 'geoprocessing': {117 'needs_gp': {118 'do_the_hustle': True,119 'oh_yeah': False120 }121 },122 'trade_area_threshold': 'woot'123 },124 '_id': 2,125 }126 ta_3 = {127 'data': {128 'geoprocessing': {129 'needs_gp': {130 'do_the_hustle': False,131 'oh_yeah': True132 }133 },134 'trade_area_threshold': 'sauce'135 },136 '_id': 3,137 }138 self._director._trade_areas = [ta_1, ta_2, ta_3]139 expected_task_rec_struct = {140 'input': {141 'geoprocessing_method': None,142 'entity_id': None,143 'entity_type': 'trade_area'144 },145 'meta': {146 'async': True147 },148 'task_status': {149 'status': 'in_progress'150 }151 }152 expected_task_rec_1 = deepcopy(expected_task_rec_struct)153 expected_task_rec_1['input']['geoprocessing_method'] = 'do_the_hustle'154 expected_task_rec_1['input']['entity_id'] = '1'155 expected_task_rec_3 = deepcopy(expected_task_rec_struct)156 expected_task_rec_3['input']['geoprocessing_method'] = 'do_the_hustle'157 expected_task_rec_3['input']['entity_id'] = '2'158 expected_task_rec_4 = deepcopy(expected_task_rec_struct)159 expected_task_rec_4['input']['geoprocessing_method'] = 'oh_yeah'160 expected_task_rec_4['input']['entity_id'] = '3'161 tasks = {162 'do_the_hustle': [expected_task_rec_1, expected_task_rec_3],163 'oh_yeah': [expected_task_rec_4]164 }165 for method in tasks:166 self._director._main_access.wfs.call_task_batch_new('retail_analytics',167 'geoprocessing',168 'geoprocess',169 tasks[method],170 self._director._context).AndReturn([{'_id': '%s_tasks_launched' % method}])171 self.mox.ReplayAll()172 GeoprocessingDirector._initialize_result(self._director)173 GeoprocessingDirector._spawn_geoprocessing_workers(self._director)174 self.assertEqual({175 'worker_task_ids': {176 'do_the_hustle': ['do_the_hustle_tasks_launched'],177 'oh_yeah': ['oh_yeah_tasks_launched']178 }...
eval.py
Source:eval.py
...33 model.eval()34 batch_size = EVALUATION_BATCH_SIZE35 if batch_size > self._data.get_size():36 batch_size = self._data.get_size()37 result = self._initialize_result()38 for i in range(self._data.get_num_batches(batch_size)):39 result = self._aggregate_batch(result, self._run_batch(model, self._data.get_batch(i, batch_size, return_indices=self._batch_indices)))40 final_batch = self._data.get_final_batch(batch_size, return_indices=self._batch_indices)41 if final_batch is not None:42 result = self._aggregate_batch(result, self._run_batch(model, final_batch))43 model.train()44 return self._finalize_result(result)45 @abc.abstractmethod46 def _run_batch(self, model, batch):47 """ Evaluates the model on a given batch of data """48 @abc.abstractmethod49 def _aggregate_batch(self, agg, batch_result):50 """ Aggregates batch result into total """51 @abc.abstractmethod52 def _initialize_result(self):53 """ Initializes the aggregate result """54 @abc.abstractmethod55 def _finalize_result(self, result):56 """ Returns the final value given the aggregate result """57 def get_name(self):58 return self._name59 @staticmethod60 def run_all(evaluations, model):61 results = dict()62 for evaluation in evaluations:63 if evaluation.get_trials() == 1:64 results[evaluation.get_name()] = evaluation.run(model)65 else:66 mean_name = evaluation.get_name()67 std_name = evaluation.get_name() + " std (n=" + str(evaluation.get_trials()) + ")"68 mean, std = evaluation.run(model)69 results[mean_name] = mean70 results[std_name] = std71 return results72class Loss(Evaluation):73 def __init__(self, name, data, data_parameters, loss_criterion, norm_dim=False, trials=1):74 super(Loss, self).__init__(name, data, data_parameters, trials=trials)75 self._loss_criterion = loss_criterion76 self._norm_dim = norm_dim77 def _run_batch(self, model, batch):78 loss = model.loss(batch, self._data_parameters, self._loss_criterion)79 if self._norm_dim:80 return (loss[0].data[0], loss[1].data[0])81 else:82 return loss.data[0]83 def _aggregate_batch(self, agg, batch_result):84 if self._norm_dim:85 return (agg[0] + batch_result[0], agg[1] + batch_result[1])86 else:87 return agg + batch_result88 def _initialize_result(self):89 if self._norm_dim:90 return (0.0, 0.0)91 else:92 return 0.093 def _finalize_result(self, result):94 if self._norm_dim:95 return result[0] / result[1]96 else:97 return result / self._data.get_size()98class DistributionAccuracy(Evaluation):99 def __init__(self, name, data, data_parameters, model_fn=None, target_indexed = False, check_unique = False, trials=1):100 super(DistributionAccuracy, self).__init__(name, data, data_parameters, trials=trials)101 self._model_fn = model_fn102 self._target_indexed = target_indexed103 self._check_unique = check_unique104 def _run_batch(self, model, batch):105 dist = None106 if self._model_fn is None:107 dist = model.forward_batch(batch, self._data_parameters)108 else:109 dist = self._model_fn(batch, model, self._data_parameters)110 target = batch[self._data_parameters[DataParameter.TARGET]].squeeze()111 model_ps = dist.p().data.cpu()112 max_ps, max_index = torch.max(model_ps, 1, keepdim=True)113 # Indicators of whether maxima are unique114 if self._check_unique:115 max_unique = (torch.sum(max_ps.expand_as(model_ps) == model_ps, 1, keepdim=True) == 1).long()116 else:117 max_unique = torch.ones(target.size(0)).long()118 total_correct = None119 if self._target_indexed:120 total_correct = torch.sum(max_unique.squeeze()*((target == max_index.squeeze()).long()))121 else:122 target_index, has_missing, mask = dist.get_index(target)123 # Count of where model max is same as target124 total_correct = torch.sum(mask*max_unique.squeeze()*((target_index == max_index.squeeze()).long()))125 return float(total_correct)126 def _aggregate_batch(self, agg, batch_result):127 return agg + batch_result128 def _initialize_result(self):129 return 0.0130 def _finalize_result(self, result):131 return result / self._data.get_size()132class ModelStatistic(Evaluation):133 def __init__(self, name, data, data_parameters, stat_fn, trials=1):134 super(ModelStatistic, self).__init__(name, data, data_parameters, trials=trials)135 self._stat_fn = stat_fn136 def _run_once(self, model):137 model.eval()138 stat = self._stat_fn(model)139 model.train()140 return stat141 def _run_batch(self, model, batch):142 pass143 def _aggregate_batch(self, agg, batch_result):144 pass145 def _initialize_result(self):146 pass147 def _finalize_result(self, result):...
writer.py
Source:writer.py
...19 assert step, "Cannot find active step for %s" % event.location20 assert not step.end_time, "Cannot update step '%s', it has already been ended" % step.description21 step.add_log(log)22 @staticmethod23 def _initialize_result(start_time):24 result = Result()25 result.start_time = start_time26 return result27 @staticmethod28 def _initialize_test_result(test, start_time):29 result = TestResult(test.name, test.description)30 result.tags.extend(test.tags)31 result.properties.update(test.properties)32 result.links.extend(test.links)33 result.rank = test.rank34 result.start_time = start_time35 return result36 @staticmethod37 def _finalize_result(result, end_time):38 result.end_time = end_time39 result.status = "passed" if result.is_successful() else "failed"40 def _lookup_step(self, event):41 try:42 return self.active_steps[event.thread_id]43 except KeyError:44 return None45 def on_test_session_start(self, event):46 self.report.start_time = event.time47 def on_test_session_end(self, event):48 self.report.end_time = event.time49 def on_test_session_setup_start(self, event):50 self.report.test_session_setup = self._initialize_result(event.time)51 def on_test_session_setup_end(self, event):52 self._finalize_result(self.report.test_session_setup, event.time)53 def on_test_session_teardown_start(self, event):54 self.report.test_session_teardown = self._initialize_result(event.time)55 def on_test_session_teardown_end(self, event):56 self._finalize_result(self.report.test_session_teardown, event.time)57 def on_suite_start(self, event):58 suite = event.suite59 suite_result = SuiteResult(suite.name, suite.description)60 suite_result.start_time = event.time61 suite_result.tags.extend(suite.tags)62 suite_result.properties.update(suite.properties)63 suite_result.links.extend(suite.links)64 suite_result.rank = suite.rank65 if suite.parent_suite:66 parent_suite_result = self._get_suite_result(suite.parent_suite)67 parent_suite_result.add_suite(suite_result)68 else:69 self.report.add_suite(suite_result)70 def on_suite_end(self, event):71 suite_result = self._get_suite_result(event.suite)72 suite_result.end_time = event.time73 def on_suite_setup_start(self, event):74 suite_result = self._get_suite_result(event.suite)75 suite_result.suite_setup = self._initialize_result(event.time)76 def on_suite_setup_end(self, event):77 suite_result = self._get_suite_result(event.suite)78 self._finalize_result(suite_result.suite_setup, event.time)79 def on_suite_teardown_start(self, event):80 suite_result = self._get_suite_result(event.suite)81 suite_result.suite_teardown = self._initialize_result(event.time)82 def on_suite_teardown_end(self, event):83 suite_result = self._get_suite_result(event.suite)84 self._finalize_result(suite_result.suite_teardown, event.time)85 def on_test_start(self, event):86 test_result = self._initialize_test_result(event.test, event.time)87 suite_result = self._get_suite_result(event.test.parent_suite)88 suite_result.add_test(test_result)89 def on_test_end(self, event):90 test_result = self._get_test_result(event.test)91 self._finalize_result(test_result, event.time)92 def _bypass_test(self, test, status, status_details, time):93 test_result = self._initialize_test_result(test, time)94 test_result.end_time = time95 test_result.status = status...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!