Best Python code snippet using lemoncheesecake
tasks.py
Source:tasks.py
...125 terminate_status = event_job_status.job_status(job_id, stop_job=True)126 if terminate_status['status']:127 update_workflow_status(spec, status=False, error='', workflow=saga.job.CANCELED)128 msg = f"Workflow Canceled for handover {spec['handover_token']}"129 log_and_publish(make_report('INFO', msg, spec))130 # stop all running beekeeper jobs131 event_job_status.stop_hive_jobs(spec['hive_db_uri'])132 return terminate_status133 return event_status134 except Exception as e:135 raise Exception(str(e))136def restart_workflow(restart_type: str, spec: dict) -> bool:137 """[Restart Stopped/Failed Workflows]138 Args:139 restart_type (str): [Option to restart beekeeper/init/entire workflow ]140 spec (dict): [Workflow payload]141 Raises:142 Exception: [When failed to restart the workflow]143 Returns:144 [bool]: [Restart status]145 """ 146 try:147 update_workflow_status(spec, status=True, error='', workflow=states.STARTED)148 if restart_type == 'BEEKEEPER':149 current_job = spec['current_job']150 # set param init_pipeline to False to run beekeeper alone151 log_and_publish(make_report('INFO', f"RESTART {current_job['PipelineName']} Pipeline", spec))152 workflow_run_pipeline.delay(current_job, spec, init_pipeline=False)153 elif restart_type == 'INIT_PIPELINE':154 current_job = spec['current_job']155 log_and_publish(make_report('INFO', f"RESTART {current_job['PipelineName']} Pipeline", spec))156 workflow_run_pipeline.delay(current_job, spec)157 elif restart_type == 'SKIP_CURRENT_PIPELINE':158 current_job = spec['current_job']159 # set current job status to skipped for reference160 current_job['pipeline_status'] = 'SKIPPED'161 spec['completed_jobs'].append(current_job)162 log_and_publish(make_report('INFO', f"SKIPPED {current_job['PipelineName']} Pipeline", spec))163 monitor_process_pipeline.delay(spec)164 elif restart_type == 'WORKFLOW':165 current_job = [spec['current_job']]166 spec['flow'] = spec['completed_jobs'] + current_job + spec['flow']167 # reset the currentjob and completed job to null168 spec['current_job'] = {}169 spec['completed_jobs'] = []170 log_and_publish(make_report('INFO', 'Restart Entire Workflow', spec))171 workflow_run_pipeline.delay(current_job, spec)172 return True173 except Exception as e:174 raise Exception(str(e))175@app.task(bind=True, queue="event_job_status", default_retry_delay=120, max_retries=None)176def event_job_status(self, spec:dict, job_id, host:str) -> bool:177 """[Get Pipeline Status and update in ES]178 Args:179 spec ([dict]): [Workflow payload]180 job_id ([type]): [radical saga job id]181 host ([type]): [Host name where the job is running]182 Raises:183 self.retry: [When Job is running]184 Returns:185 [bool]: [Job status]186 """ 187 try:188 host_con_details = pycfg.__dict__[host]189 event_job_status = RemoteCmd(190 REMOTE_HOST=host_con_details.get('REMOTE_HOST', None),191 ADDRESS=host_con_details.get('ADDRESS', None),192 USER=host_con_details.get('USER', None),193 PASSWORD=host_con_details.get('PASSWORD', None),194 WORKING_DIR=host_con_details.get('WORKING_DIR', None),195 )196 event_status = event_job_status.job_status(job_id)197 if event_status['status']:198 if event_status['job_status'] in [saga.job.DONE]:199 # check if beekeeper is completed successfully200 beekeeper_status = event_job_status.beekeper_status(spec['hive_db_uri'])201 if beekeeper_status['status'] and beekeeper_status['value'] == 'NO_WORK':202 msg = f"Pipeline {spec['current_job']['PipelineName']} {saga.job.DONE} "203 spec['status'] = True204 spec['completed_jobs'].append(spec['current_job'])205 spec['current_job'] = {}206 log_and_publish(make_report('INFO', msg, spec))207 # start another pipeline208 monitor_process_pipeline.delay(spec)209 else:210 msg = f"Pipeline {spec['current_job']['PipelineName']} beekeeper failed {beekeeper_status['error']}"211 update_workflow_status(spec, status=False, error=beekeeper_status['value'],212 workflow=saga.job.FAILED)213 log_and_publish(make_report('ERROR', msg, spec))214 if event_status['job_status'] in [saga.job.SUSPENDED, saga.job.CANCELED, saga.job.FAILED, saga.job.UNKNOWN]:215 msg = f"Pipeline {spec['current_job']['PipelineName']} {event_status['job_status']}"216 update_workflow_status(spec, status=False, error=event_status['error'],217 workflow=event_status['job_status'])218 log_and_publish(make_report('ERROR', msg, spec))219 return False220 else:221 msg = f"Failed to fetch Pipeline {spec['current_job']['PipelineName']} status "222 update_workflow_status(spec, status=False, error=event_status['error'], workflow=saga.job.FAILED)223 log_and_publish(make_report('ERROR', msg, spec))224 return False225 except Exception as e:226 update_workflow_status(spec, status=False, error=str(e), workflow=saga.job.FAILED)227 log_and_publish(make_report('ERROR', str(e), spec))228 return False229 if event_status['status']:230 if event_status['job_status'] in [saga.job.PENDING, saga.job.RUNNING]:231 msg = f"Pipeline {spec['current_job']['PipelineName']} {saga.job.RUNNING}"232 spec_debug = {key: value for (key, value) in spec.items() if key not in ['flow', 'completed_jobs']}233 log_and_publish(make_report('DEBUG', msg, spec_debug))234 raise self.retry()235 return msg236@app.task(bind=True, queue="workflow", task_track_started=True,237 result_persistent=True)238def workflow_run_pipeline(self, run_job: dict, global_spec: dict, init_pipeline: Optional[bool]=True):239 """[Celery worker to initiate the pipeline and its beekeeper]240 Args:241 run_job (dict): [Current pipeline job to start]242 global_spec (dict): [Workflow information]243 init_pipeline (Optional[bool], optional): [to initiate the hive init_pipeline]. Defaults to True.244 Raises:245 ValueError: [When failed to initialize pipeline]246 Returns:247 [bool]: [pipeline initiation status]248 """ 249 try:250 temp = construct_pipeline(run_job, global_spec)251 # execute remote command over ssh252 # get fram login details based on host253 host = temp['HOST'] if temp['HOST'] else pycfg.DEFAULT_HOST254 host_con_details = pycfg.__dict__[host]255 exece = RemoteCmd(256 REMOTE_HOST=host_con_details.get('REMOTE_HOST', None),257 ADDRESS=host_con_details.get('ADDRESS', None),258 USER=host_con_details.get('USER', None),259 PASSWORD=host_con_details.get('PASSWORD', None),260 WORKING_DIR=host_con_details.get('WORKING_DIR', None),261 mysql_url=temp['mysql_url']262 )263 global_spec['task_id'] = self.request.id264 global_spec['hive_db_uri'] = temp['mysql_url']265 msg = f"Pipeline {run_job['PipelineName']} Intiated"266 log_and_publish(make_report('DEBUG', msg, global_spec))267 job = {'status': True}268 if init_pipeline:269 job = exece.run_job(command=' '.join(temp['init']['command']), args=temp['init']['args'],270 stdout=temp['init']['stdout'], stderr=temp['init']['stderr'], synchronus=True)271 if job['status']:272 job = exece.run_job(command=' '.join(temp['beekeeper']['command']),273 args=temp['beekeeper']['args'], stdout=temp['beekeeper']['stdout'],274 stderr=temp['beekeeper']['stderr'])275 if job['status']:276 global_spec['current_job']['job_id'] = job['job_id']277 global_spec['current_job']['HOST'] = host278 msg = f"Pipeline {run_job['PipelineName']} {job['state']}"279 log_and_publish(make_report('INFO', msg, global_spec))280 event_job_status.delay(global_spec, job['job_id'], host)281 else:282 raise ValueError(f"Pipeline {run_job['PipelineName']} failed : {job['error']}")283 else:284 raise ValueError(f"Pipeline {run_job['PipelineName']} failed: {job['error']}")285 return True286 except Exception as e:287 update_workflow_status(global_spec, status=False, error=str(e), workflow=saga.job.FAILED)288 log_and_publish(make_report('ERROR', str(e), global_spec))289 return f"{run_job['PipelineName']} : Exception error: {str(e)}"290@app.task(bind=True, queue="monitor")291def monitor_process_pipeline(self, spec: dict):292 """[Checks Workflows Status and pending pipelines to run]293 Args:294 spec (dict): [Workflow payload details]295 Returns:296 [bool]: [Workflow status]297 """ 298 try:299 if spec.get('status', False):300 if len(spec.get('flow', [])) > 0:301 job = spec['flow'].pop(0)302 spec['current_job'] = job303 spec['status'] = True304 spec['workflow'] = states.STARTED305 msg = f"Pipeline {job['PipelineName']} Started!"306 log_and_publish(make_report('INFO', msg, spec))307 # run pipeline job308 workflow_run_pipeline.delay(job, spec)309 elif len(spec.get('flow', [])) == 0:310 spec['status'] = True311 spec['current_job'] = {}312 spec['workflow'] = saga.job.DONE313 msg = f"Workflow completed for handover {spec['handover_token']}"314 log_and_publish(make_report('INFO', msg, spec))315 else:316 spec['status'] = False317 spec['workflow'] = saga.job.FAILED318 msg = f"Workflow failed to complete for handover {spec['handover_token']}"319 log_and_publish(make_report('ERROR', msg, spec))320 except Exception as e:321 update_workflow_status(spec, status=False, error=str(e), workflow=saga.job.FAILED)322 msg = f"Workflow failed to complete for handover {spec['handover_token']}: {str(e)}"323 log_and_publish(make_report('ERROR', msg, spec))324 return f"Error: {str(e)}"325 return True326def initiate_pipeline(spec: dict, event: Optional[dict]={}, rerun: Optional[bool]=False) -> dict:327 """[Prepare Workflow payload from handover payload and initialize it]328 Args:329 spec (dict): [Handover Payload specification] 330 event (Optional[dict], optional): [event received]. Defaults to {}.331 rerun (Optional[bool], optional): [rerun type]. Defaults to False.332 Raises:333 Exception: [When failed to prepare workflow payload]334 Returns:335 dict: [Workflow initialize status with payload information]336 """ 337 try:338 # prepare the payload with production pipelines based on dbtype and division339 spec.update(prepare_payload(spec))340 # set username to run the pipeline341 if not spec.get('user', None):342 spec['user'] = pycfg.FARM_USER343 # set hive url to run the pipelines344 if not spec.get('hive_url', None):345 spec['hive_url'] = pycfg.HIVE_URL346 if 'flow' not in spec or len(spec['flow']) == 0:347 raise Exception('Unable to construct workflow to run production pipeline.')348 # remove .....349 # spec['flow'] = [spec['flow'][0]]350 msg = f"Workflow Started for handover token {spec['handover_token']}"351 log_and_publish(make_report('INFO', msg, spec))352 # submit workflow to monitor queue353 monitor_process_pipeline.delay(spec)354 return {'status': True, 'error': '', 'spec': spec}355 except Exception as e:356 update_workflow_status(spec, status=False, error=str(e), workflow=saga.job.FAILED)357 msg = f"Workflow failed for handover token {spec['handover_token']}"358 log_and_publish(make_report('INFO', msg, spec))...
test_models.py
Source:test_models.py
...23 self.bench = Benchmark.objects.get(name='TestBench')24 def test_average_change_bad(self):25 self.make_result(12)26 s2 = self.make_result(15)27 rep = self.make_report(s2)28 self.assertEqual(rep.colorcode, 'red')29 def test_average_change_good(self):30 self.make_result(15)31 s2 = self.make_result(12)32 rep = self.make_report(s2)33 self.assertEqual(rep.colorcode, 'green')34 def test_within_threshold_none(self):35 self.make_result(15)36 s2 = self.make_result(15.2)37 rep = self.make_report(s2)38 self.assertEqual(rep.colorcode, 'none')39 def test_initial_revision_none(self):40 s2 = self.make_result(15)41 rep = self.make_report(s2)42 self.assertEqual(rep.colorcode, 'none')43 def test_bench_change_good(self):44 b1 = self.make_bench('b1')45 s1 = self.make_result(15)46 self.make_result(15, rev=s1, benchmark=b1)47 s2 = self.make_result(14.54)48 self.make_result(15, rev=s2, benchmark=b1)49 rep = self.make_report(s2)50 self.assertEqual(rep.colorcode, 'green')51 self.assertIn(self.bench.name, rep.summary)52 def test_bench_change_bad(self):53 b1 = self.make_bench('b1')54 s1 = self.make_result(15)55 self.make_result(15, rev=s1, benchmark=b1)56 s2 = self.make_result(15.46)57 self.make_result(15, rev=s2, benchmark=b1)58 rep = self.make_report(s2)59 self.assertEqual(rep.colorcode, 'red')60 self.assertIn(self.bench.name, rep.summary)61 # NOTE: Don't need to test with multiple projects since the calculation of62 # urgency doesn't take projects into account63 def test_average_change_beats_bench_change(self):64 b1 = self.make_bench('b1')65 s1 = self.make_result(15)66 self.make_result(15, rev=s1, benchmark=b1)67 s2 = self.make_result(14)68 self.make_result(15, rev=s2, benchmark=b1)69 rep = self.make_report(s2)70 self.assertIn('Average', rep.summary)71 def test_good_benchmark_change_beats_bad_average_trend(self):72 changes = self.make_bad_trend()73 b1 = self.make_bench('b1')74 for x in changes:75 s1 = self.make_result(x)76 if x != changes[-1]:77 self.make_result(x, rev=s1, benchmark=b1)78 self.make_result(changes[-2] * .97, rev=s1, benchmark=b1)79 rep = self.make_report(s1)80 self.assertEquals('green', rep.colorcode)81 self.assertIn('b1', rep.summary)82 def test_good_average_change_beats_bad_average_trend(self):83 changes = self.make_bad_trend()84 b1 = self.make_bench('b1')85 for x in changes:86 s1 = self.make_result(x)87 if x != changes[-1]:88 self.make_result(x, rev=s1, benchmark=b1)89 self.make_result(changes[-2] * .92, rev=s1, benchmark=b1)90 rep = self.make_report(s1)91 self.assertEquals('green', rep.colorcode)92 self.assertIn('Average', rep.summary)93 def test_good_change_beats_good_trend(self):94 changes = self.make_good_trend()95 b1 = self.make_bench('b1')96 for x in changes:97 s1 = self.make_result(x)98 if x != changes[-1]:99 self.make_result(x, rev=s1, benchmark=b1)100 self.make_result(changes[-2] * .95, rev=s1, benchmark=b1)101 rep = self.make_report(s1)102 self.assertIn('b1', rep.summary)103 self.assertNotIn('trend', rep.summary)104 def test_bad_trend_beats_good_trend(self):105 good_changes = self.make_good_trend()106 bad_changes = self.make_bad_trend()107 b1 = self.make_bench('b1')108 for i in range(len(good_changes)):109 s1 = self.make_result(good_changes[i])110 self.make_result(bad_changes[i], rev=s1, benchmark=b1)111 rep = self.make_report(s1)112 self.assertIn('trend', rep.summary)113 self.assertIn('b1', rep.summary)114 self.assertIn('yellow', rep.colorcode)115 def test_bad_change_beats_good_trend(self):116 changes = self.make_good_trend()117 b1 = self.make_bench('b1')118 for x in changes:119 s1 = self.make_result(x)120 if x != changes[-1]:121 self.make_result(x, rev=s1, benchmark=b1)122 self.make_result(changes[-2] * 1.05, rev=s1, benchmark=b1)123 rep = self.make_report(s1)124 self.assertIn('b1', rep.summary)125 self.assertNotIn('trend', rep.summary)126 self.assertEquals('red', rep.colorcode)127 def test_bad_beats_good_change(self):128 b1 = self.make_bench('b1')129 s1 = self.make_result(12)130 self.make_result(12, rev=s1, benchmark=b1)131 s2 = self.make_result(15)132 self.make_result(9, rev=s2, benchmark=b1)133 rep = self.make_report(s2)134 self.assertEqual(rep.colorcode, 'red')135 def test_bigger_bad_beats_smaller_bad(self):136 b1 = self.make_bench('b1')137 b2 = self.make_bench('b2')138 s1 = self.make_result(1.0)139 self.make_result(1.0, rev=s1, benchmark=b1)140 self.make_result(1.0, rev=s1, benchmark=b2)141 s2 = self.make_result(1.0)142 self.make_result(1.04, rev=s2, benchmark=b1)143 self.make_result(1.03, rev=s2, benchmark=b2)144 rep = self.make_report(s2)145 self.assertIn('b1', rep.summary)146 self.assertEquals('red', rep.colorcode)147 def test_multiple_quantities(self):148 b1 = self.make_bench('b1', quantity='Space', units='bytes')149 s1 = self.make_result(1.0)150 self.make_result(1.0, rev=s1, benchmark=b1)151 s2 = self.make_result(1.4)152 self.make_result(1.5, rev=s2, benchmark=b1)153 rep = self.make_report(s2)154 self.assertRegexpMatches(rep.summary, '[sS]pace')155 self.assertEquals('red', rep.colorcode)156 def make_result(self, value, rev=None, benchmark=None):157 from uuid import uuid4158 if not benchmark:159 benchmark = self.bench160 if not rev:161 commitdate = self.starttime + timedelta(days=self.days)162 cid = str(uuid4())163 Revision(commitid=cid, date=commitdate, branch=self.b,164 project=self.pro).save()165 rev = Revision.objects.get(commitid=cid)166 Result(value=value, revision=rev, executable=self.exe,167 environment=self.env, benchmark=benchmark).save()168 self.days += 1169 return rev170 def make_report(self, revision):171 Report(revision=revision, environment=self.env,172 executable=self.exe).save()173 return Report.objects.get(revision=revision)174 def make_bench(self, name, quantity='Time', units='seconds'):175 Benchmark(name=name, units_title=quantity, units=units).save()176 return Benchmark.objects.get(name=name)177 def make_bad_trend(self):178 return self.make_trend(1)179 def make_good_trend(self):180 return self.make_trend(-1)181 def make_trend(self, direction):182 return [1 + direction * x * 1.25 *183 settings.TREND_THRESHOLD / 100 / settings.TREND184 for x in range(settings.TREND)]...
test_record_attribute.py
Source:test_record_attribute.py
1import logging2import unittest3from logging import Formatter4from urllib.parse import quote5from flask import Flask6from logging_utilities.filters import ConstAttribute7from logging_utilities.filters.flask_attribute import FlaskRequestAttribute8app = Flask(__name__)9class RecordAttributesTest(unittest.TestCase):10 def setUp(self):11 super().setUp()12 self.maxDiff = None13 @classmethod14 def _configure_const_attribute(cls, logger):15 logger.setLevel(logging.DEBUG)16 for handler in logger.handlers:17 const_attribute = ConstAttribute(application="test_application")18 handler.addFilter(const_attribute)19 handler.setFormatter(Formatter("%(levelname)s:%(application)s:%(message)s"))20 @classmethod21 def _configure_flask_attribute(cls, logger):22 logger.setLevel(logging.DEBUG)23 for handler in logger.handlers:24 const_attribute = FlaskRequestAttribute(25 attributes=['url', 'method', 'headers', 'json', 'query_string']26 )27 handler.addFilter(const_attribute)28 handler.setFormatter(29 Formatter(30 "%(levelname)s:%(message)s:%(flask_request_url)s:%(flask_request_json)s:%(flask_request_query_string)s"31 )32 )33 def test_const_attribute(self):34 with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:35 logger = logging.getLogger('test_formatter')36 self._configure_const_attribute(logger)37 logger.info('Simple message')38 logger.info('Composed message: %s', 'this is a composed message')39 logger.info('Composed message %s', 'with extra', extra={'extra1': 23})40 self.assertEqual(41 ctx.output,42 [43 'INFO:test_application:Simple message',44 'INFO:test_application:Composed message: this is a composed message',45 'INFO:test_application:Composed message with extra'46 ]47 )48 def test_empty_flask_attribute(self):49 with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:50 logger = logging.getLogger('test_formatter')51 self._configure_flask_attribute(logger)52 logger.info('Simple message')53 logger.info('Composed message: %s', 'this is a composed message')54 logger.info('Composed message %s', 'with extra', extra={'extra1': 23})55 self.assertEqual(56 ctx.output,57 [58 'INFO:Simple message:::',59 'INFO:Composed message: this is a composed message:::',60 'INFO:Composed message with extra:::'61 ]62 )63 def test_flask_attribute_json(self):64 with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:65 logger = logging.getLogger('test_formatter')66 self._configure_flask_attribute(logger)67 with app.test_request_context('/make_report/2017', data={'format': 'short'}):68 logger.info('Simple message')69 logger.info('Composed message: %s', 'this is a composed message')70 logger.info('Composed message %s', 'with extra', extra={'extra1': 23})71 with app.test_request_context('/make_report/2017', data=''):72 logger.info('Simple message')73 with app.test_request_context(74 '/make_report/2017', data='non json data', content_type='application/json'75 ):76 logger.info('Simple message')77 with app.test_request_context(78 '/make_report/2017', data='{}', content_type='application/json'79 ):80 logger.info('Simple message')81 with app.test_request_context(82 '/make_report/2017',83 data='{"jsonData": "this is a json data"}',84 content_type='application/json'85 ):86 logger.info('Simple message')87 self.assertEqual(88 ctx.output,89 [90 # pylint: disable=line-too-long91 'INFO:Simple message:http://localhost/make_report/2017:None:',92 'INFO:Composed message: this is a composed message:http://localhost/make_report/2017:None:',93 'INFO:Composed message with extra:http://localhost/make_report/2017:None:',94 'INFO:Simple message:http://localhost/make_report/2017:None:',95 "INFO:Simple message:http://localhost/make_report/2017:b'non json data':",96 'INFO:Simple message:http://localhost/make_report/2017:{}:',97 "INFO:Simple message:http://localhost/make_report/2017:{'jsonData': 'this is a json data'}:",98 ]99 )100 def test_flask_attribute_query_string(self):101 with self.assertLogs('test_formatter', level=logging.DEBUG) as ctx:102 logger = logging.getLogger('test_formatter')103 self._configure_flask_attribute(logger)104 with app.test_request_context('/make_report/2017?param1=value1'):105 logger.info('Simple message')106 logger.info('Composed message: %s', 'this is a composed message')107 logger.info('Composed message %s', 'with extra', extra={'extra1': 23})108 with app.test_request_context('/make_report/2017?param1=value1¶m2=value2'):109 logger.info('Simple message')110 with app.test_request_context(f'/make_report/2017?param1={quote("This a string ?")}'):111 logger.info('Simple message')112 self.assertEqual(113 ctx.output,114 [115 # pylint: disable=line-too-long116 'INFO:Simple message:http://localhost/make_report/2017?param1=value1:None:param1=value1',117 'INFO:Composed message: this is a composed message:http://localhost/make_report/2017?param1=value1:None:param1=value1',118 'INFO:Composed message with extra:http://localhost/make_report/2017?param1=value1:None:param1=value1',119 'INFO:Simple message:http://localhost/make_report/2017?param1=value1¶m2=value2:None:param1=value1¶m2=value2',120 'INFO:Simple message:http://localhost/make_report/2017?param1=This%20a%20string%20%3F:None:param1=This%20a%20string%20%3F',121 ]...
run_application.py
Source:run_application.py
...23 scenario = killer_bots.KillerBotsScenario()24 core_logic.process_scenario(config, database, scenario) 25 else:26 print('%s is not a valid scenario' %(scenario_chosen))27def make_report(scenario_chosen):28 if scenario_chosen == 'chasing_target':29 config = config_application.ConfigChasingTarget()30 database = config_application.DatabaseConfig(config)31 report_writer.make_report(database, config.success_string)32 elif scenario_chosen == 'destroy_target':33 config = config_application.ConfigDestroyTarget()34 database = config_application.DatabaseConfig(config)35 report_writer.make_report(database, config.success_string)36 elif scenario_chosen == 'killer_bots':37 config = config_application.ConfigKillerBots()38 database = config_application.DatabaseConfig(config)39 report_writer.make_report(database, config.success_string)40 else:41 print('%s is not a valid scenario' %(scenario_chosen))42#def make_video(scenario_chosen):43# print("from the application folder and animation_control module")44def process_inputs(inputs):45 action_chosen = inputs[1]46 scenario_chosen = inputs[2]47 if action_chosen == 'do_scenario':48 do_scenario(scenario_chosen)49 #elif action_chosen == 'make_video':50 # make_video(scenario_chosen)51 elif action_chosen == 'make_report':52 make_report(scenario_chosen)53 else:54 print('%s is not a valid action' %(action_chosen))55def receive_inputs():56 inputs = sys.argv57 input_number_required = 358 if len(inputs) == input_number_required:59 process_inputs(inputs)60 else:61 print('Enter your scenario to process. Eg -'62 ' ```python3 run_application.py do_scenario chasing_target``` '63 'is a valid one')64receive_inputs()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!