Best Python code snippet using localstack_python
workflows.py
Source:workflows.py
...229 elif adhoc_only is False and period != Period.none:230 if req.schedule.target is None:231 raise BadRequestException(232 f'unexcepted params: period={period} but target=None.')233 cron_schedule = convert_schedule_to_cron(period, req.schedule.target)234 suspend = False235 # validation error236 else:237 raise BadRequestException(238 f'unexpected params: period={period}, adhoc={adhoc_only}')239 ## Jobs Settings240 spec_jobs = []241 workflow_configs = []242 annotation_api_keys = []243 history_limit = None244 ## Generate workflow jobs245 for index, job in enumerate(req.jobs):246 ## crawlerId247 crawler_id = job.crawlerId248 logger.info(249 f"Generate workflow jobs: workflow_id={workflow_id}, crawler_id={crawler_id}"250 )251 ## config template252 template = job.configTemplate253 template_name = template["name"]254 template_version = template["version"]255 if index == 0:256 history_limit = template[257 "historyLimit"] if "historyLimit" in template else None258 ## filled data259 data = job.data260 ## Generate data for api_keys annotation261 api_keys_data = {}262 api_key = []263 target_secret = []264 ui_properties = template["config"]["schema"]["properties"]265 for prop_key in ui_properties.keys():266 if "$ref" in ui_properties[prop_key]:267 if "#/definitions/secrets" == ui_properties[prop_key]["$ref"]:268 api_key.append(prop_key)269 # Get secret_id from data270 secret_id = data[prop_key]271 target_secret.append(secret_id)272 if len(api_key) > 0:273 api_keys_data = {274 "api_key": api_key,275 "target_secret": target_secret276 }277 annotation_api_keys.append(api_keys_data)278 ## Generate spec:config:input,output279 flattened_data = flatten_keys(data)280 default_config = template["default"]281 input_dict = template["input"]282 output_dict = template["output"]283 ### process input284 input_dict = template["input"]285 input_dict = replace_template_variables(flattened_data, input_dict)286 input_dict = replace_template_variables(default_config, input_dict)287 ### process output288 output_dict = template["output"]289 output_dict = replace_template_variables(flattened_data, output_dict)290 output_dict = replace_template_variables(default_config, output_dict)291 ## Generate spec:jobs:config292 config = crawlers.CrawlersConfigs(name=template_name,293 version=template_version,294 input=input_dict,295 output=output_dict,296 env=env_config,297 schedule=schedule)298 ## Generate element of spec:jobs299 spec_job = crawler.CrawlerSpecJobs(300 id=crawler_id,301 config=json.dumps(config.dict(), ensure_ascii=False),302 image="{url}/{gcr}/{name}:{version}".format(303 url=settings.collectro_host,304 gcr=settings.collectro_gcr,305 name=template_name,306 version=template_version),307 imagePullSecret=k8s_settings.image_pull_secret,308 serviceAccountName=k8s_settings.service_account_name)309 spec_jobs.append(spec_job)310 ## for ui_form311 form_cnf = model.WorkflowsAnnotationUiFormConfigElement(312 crawlerId=crawler_id, data=data)313 workflow_configs.append(form_cnf.dict())314 logger.info(f"Generate crawler: workflow_id={workflow_id}")315 ## Generate spec316 spec = crawler.CrawlerSpec(name=req.name,317 menuId=req.menuId,318 schedule=cron_schedule,319 period=period,320 jobs=spec_jobs,321 suspend=suspend,322 historyLimit=history_limit)323 ## Generate crawler324 ## metadata:annotations325 ui_form = model.WorkflowsAnnotationUiForm(schedule=req.schedule,326 jobs=workflow_configs)327 annotations = {328 settings.Kubernetes.ui_form_annotation:329 json.dumps(ui_form.dict(), ensure_ascii=False),330 settings.Kubernetes.api_keys_annotation:331 json.dumps(annotation_api_keys, ensure_ascii=False)332 }333 metadata = {334 "name": workflow_id,335 "annotations": annotations,336 }337 ## custom resource definition338 custom = crawler.Crawler(apiVersion=k8s_settings.api_version,339 kind=k8s_settings.crawler_kind,340 metadata=metadata,341 spec=spec)342 return custom343# adhoc executor344def adhoc_execute(workflow_id: str, req: Dict) -> JSONResponse:345 try:346 logger.info(347 f"Called adhoc_execute: workflow_id={workflow_id}, param={req}")348 get_crawler = kubectl.get_namespaced_crawler(workflow_id)349 crawler_custom = crawler.Crawler(**get_crawler)350 ui_form = json.loads(crawler_custom.metadata["annotations"][351 k8s_settings.ui_form_annotation])352 ui_form = model.WorkflowsAnnotationUiForm(**ui_form)353 jobs = []354 for _, job in enumerate(crawler_custom.spec.jobs):355 # Get config356 config = json.loads(job.config)357 model_config = crawlers.CrawlersConfigs(**config)358 # template_name:version359 template_name = model_config.name360 template_version = model_config.version361 # get config template from server362 template = configtemplate.get_config_template(363 template_name, template_version)364 if "adhoc" not in template:365 raise UnsupportedAdhocException(366 f'workflow_id: {workflow_id} cannot execute adhoc.')367 # prepare config for adhoc368 input_dict = template["adhoc"]["input"]369 input_dict.update(model_config.input)370 output_dict = template["adhoc"]["output"]371 output_dict.update(model_config.output)372 # Replace parameters for adhoc373 input_dict = replace_template_variables(req, input_dict)374 output_dict = replace_template_variables(req, output_dict)375 # Set adhoc param in schedule376 schedule = crawlers.CrawlersConfigSchedules(377 period=model_config.schedule.period,378 relativeDiffTime=model_config.schedule.relativeDiffTime,379 adhoc=True)380 ## Generate spec:jobs:config381 config = crawlers.CrawlersConfigs(name=template_name,382 version=template_version,383 input=input_dict,384 output=output_dict,385 env=model_config.env,386 schedule=schedule)387 # spec388 adhoc_spec_job = crawler.CrawlerSpecJobs(389 id=job.id,390 config=json.dumps(config.dict(), ensure_ascii=False),391 image=job.image,392 command=job.command,393 imagePullSecret=job.imagePullSecret,394 serviceAccountName=job.serviceAccountName)395 jobs.append(adhoc_spec_job)396 # Generate name397 epoch_time = int(time.time())398 name = f"{workflow_id}-{epoch_time}-0"399 # Generate spec400 spec = crawler.CrawlerRunSpec(401 jobs=jobs,402 menuId=crawler_custom.spec.menuId,403 ownerName=crawler_custom.spec.name,404 period=Period.none,405 scheduleTime="{0:%Y-%m-%dT%H:%M:%SZ}".format(datetime.utcnow()))406 # Generate annotations407 annotations = crawler_custom.metadata["annotations"]408 annotations.update({409 settings.Kubernetes.adhoc_ui_form_annotation:410 json.dumps(req, ensure_ascii=False)411 })412 # Generate metadata413 metadata = {414 "name":415 name,416 "namespace":417 crawler_custom.metadata["namespace"],418 "annotations":419 annotations,420 "ownerReferences": [{421 "apiVersion": crawler_custom.apiVersion,422 "blockOwnerDeletion": True,423 "controller": True,424 "kind": crawler_custom.kind,425 "name": workflow_id,426 "uid": crawler_custom.metadata["uid"]427 }]428 }429 custom = crawler.CrawlerRun(apiVersion=k8s_settings.api_version,430 kind=k8s_settings.crawlerrun_kind,431 metadata=metadata,432 spec=spec)433 logger.info(f'Create adhoc workflow: name={name}')434 _ = kubectl.create_namespaced_crawlerrun(custom.dict())435 return handlers.ok(content=None)436 except UnsupportedAdhocException as e:437 logger.error(e)438 return handlers.error(model.UnsupportedAdhoc())439 except ApiException as e:440 logger.error(441 f"ApiException: Failed to get workflow: workflow_id={workflow_id}: {e}"442 )443 if e.status == 404:444 return handlers.error(model.WorkflowsNotFound())445 else:446 return handlers.common_internal_server_error()447 except Exception as e:448 logger.error(449 f"Exception: Failed to execute adhoc workflow: workflow_id={workflow_id}: {e}"450 )451 return handlers.common_internal_server_error()452# Suspend workflow453def suspend_workflow(workflow_id: str,454 req: model.WorkflowsSuspendRequest) -> JSONResponse:455 try:456 logger.info(457 f'Called suspend_workflow: workflow_id={workflow_id}, req={req}')458 custom_object = kubectl.get_namespaced_crawler(workflow_id)459 custom_object = crawler.Crawler(**custom_object)460 if custom_object.spec.period == Period.none:461 raise UnsupportedSuspendException(462 f"workflow_id: {workflow_id} cannot change the parameter to suspend."463 )464 custom = {"spec": {"suspend": req.suspend}}465 _ = kubectl.patch_namespaced_crawler(workflow_id, custom)466 return handlers.ok(content=None)467 except UnsupportedSuspendException as e:468 logger.error(e)469 return handlers.error(model.UnsupportedSuspend())470 except ApiException as e:471 logger.error(472 f"ApiException: Failed to suspend workflow: workflow_id={workflow_id}: {e}"473 )474 if e.status == 404:475 return handlers.error(model.WorkflowsNotFound())476 else:477 return handlers.common_internal_server_error()478 except Exception as e:479 logger.error(480 f"Exception: Failed to suspend workflow: workflow_id={workflow_id}: {e}"481 )482 return handlers.common_internal_server_error()483## Get update_time from annotaions484def get_update_time(custom_object: crawler.Crawler) -> str:485 annotations = custom_object.metadata["annotations"]486 if k8s_settings.updated_time_annotation in annotations:487 update_time = annotations[k8s_settings.updated_time_annotation]488 else:489 update_time = custom_object.metadata["creationTimestamp"]490 return update_time491# Get ui_form with the updated suspend state492def get_ui_form_with_updated_suspend(custom_object: crawler.Crawler) -> dict:493 annotations = custom_object.metadata["annotations"]494 ui_form = annotations[k8s_settings.ui_form_annotation]495 ui_form = json.loads(ui_form)496 ui_form["schedule"].update({"suspend": custom_object.spec.suspend})497 return ui_form498# Convert schedule to cron499def convert_schedule_to_cron(500 period: str, schedule: model.WorkflowsCommonScheduleTarget) -> str:501 if period == 'hourly':502 res = "{minute} * * * *".format(minute=format(schedule.minute, '02d'))503 elif schedule.hour is None or schedule.minute is None:504 raise Exception(505 f"convert_schedule_to_cron: period is {period} but hour or minute not defined: schedule={schedule}"506 )507 else:508 ## Convert timestamp to utc509 dt_utc = convert_jst_to_utc(schedule)510 if period == 'daily':511 res = dt_utc.strftime('%M %H * * *')512 elif period == 'weekly':513 if schedule.week is None:...
events_listener.py
Source:events_listener.py
...73 LOG.info(74 f"Unable to send event notification {truncate(event)} to target {target}: {e}"75 )76 return func77def convert_schedule_to_cron(schedule):78 """Convert Events schedule like "cron(0 20 * * ? *)" or "rate(5 minutes)" """79 cron_regex = r"\s*cron\s*\(([^\)]*)\)\s*"80 if re.match(cron_regex, schedule):81 cron = re.sub(cron_regex, r"\1", schedule)82 return cron83 rate_regex = r"\s*rate\s*\(([^\)]*)\)\s*"84 if re.match(rate_regex, schedule):85 rate = re.sub(rate_regex, r"\1", schedule)86 value, unit = re.split(r"\s+", rate.strip())87 if "minute" in unit:88 return "*/%s * * * *" % value89 if "hour" in unit:90 return "* */%s * * *" % value91 if "day" in unit:92 return "* * */%s * *" % value93 raise Exception("Unable to parse events schedule expression: %s" % schedule)94 return schedule95def handle_put_rule(data):96 schedule = data.get("ScheduleExpression")97 enabled = data.get("State") != "DISABLED"98 if schedule:99 job_func = get_scheduled_rule_func(data)100 cron = convert_schedule_to_cron(schedule)101 LOG.debug("Adding new scheduled Events rule with cron schedule %s" % cron)102 job_id = JobScheduler.instance().add_job(job_func, cron, enabled)103 rule_scheduled_jobs = EventsBackend.get().rule_scheduled_jobs104 rule_scheduled_jobs[data["Name"]] = job_id105 return True106def handle_delete_rule(rule_name):107 rule_scheduled_jobs = EventsBackend.get().rule_scheduled_jobs108 job_id = rule_scheduled_jobs.get(rule_name)109 if job_id:110 LOG.debug("Removing scheduled Events: {} | job_id: {}".format(rule_name, job_id))111 JobScheduler.instance().cancel_job(job_id=job_id)112def handle_disable_rule(rule_name):113 rule_scheduled_jobs = EventsBackend.get().rule_scheduled_jobs114 job_id = rule_scheduled_jobs.get(rule_name)...
test_crawlers.py
Source:test_crawlers.py
...42def test_convert_schedule_to_cron_1():43 period = 'daily'44 schedule = '00:07' ## JST45 expect_result = '07 15 * * *' ## UTC46 execute = crawlers.convert_schedule_to_cron(period, schedule)47 assert execute == expect_result48def test_convert_schedule_to_cron_2():49 period = 'hourly'50 schedule = '15'51 expect_result = '15 * * * *'52 execute = crawlers.convert_schedule_to_cron(period, schedule)53 assert execute == expect_result54def test_build_schema_to_config_1():55 target = [56 {57 'period': 'daily',58 'relative': 2,59 'target_daily': '09:07' ## JST60 }61 ]62 expect_period = 'daily'63 expect_relative = '2 days ago'64 expect_schedule = '07 00 * * *' ## UTC65 period, relative, schedule = crawlers.build_schema_to_config(target)66 assert (...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!