Best Python code snippet using lemoncheesecake
test_store_failure_lines.py
Source:test_store_failure_lines.py
1import json2import pytest3import responses4from django.conf import settings5from requests.exceptions import HTTPError6from treeherder.log_parser.failureline import (7 store_failure_lines,8 write_failure_lines,9 get_group_results,10)11from treeherder.model.models import FailureLine, Group, JobLog, GroupStatus12from ..sampledata import SampleData13def test_store_error_summary(activate_responses, test_repository, test_job):14 log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")15 log_url = 'http://my-log.mozilla.org'16 with open(log_path) as log_handler:17 responses.add(responses.GET, log_url, body=log_handler.read(), status=200)18 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)19 store_failure_lines(log_obj)20 assert FailureLine.objects.count() == 121 failure = FailureLine.objects.get(pk=1)22 assert failure.job_guid == test_job.guid23 assert failure.repository == test_repository24def test_store_error_summary_default_group(activate_responses, test_repository, test_job):25 log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")26 log_url = 'http://my-log.mozilla.org'27 with open(log_path) as log_handler:28 resp_body = json.load(log_handler)29 resp_body["group"] = "default"30 responses.add(responses.GET, log_url, body=json.dumps(resp_body), status=200)31 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)32 store_failure_lines(log_obj)33 assert FailureLine.objects.count() == 134def test_store_error_summary_truncated(activate_responses, test_repository, test_job, monkeypatch):35 log_path = SampleData().get_log_path("plain-chunked_errorsummary_10_lines.log")36 log_url = 'http://my-log.mozilla.org'37 monkeypatch.setattr(settings, 'FAILURE_LINES_CUTOFF', 5)38 with open(log_path) as log_handler:39 responses.add(responses.GET, log_url, body=log_handler.read(), status=200)40 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)41 store_failure_lines(log_obj)42 assert FailureLine.objects.count() == 5 + 143 failure = FailureLine.objects.get(action='truncated')44 assert failure.job_guid == test_job.guid45 assert failure.repository == test_repository46def test_store_error_summary_astral(activate_responses, test_repository, test_job):47 log_path = SampleData().get_log_path("plain-chunked_errorsummary_astral.log")48 log_url = 'http://my-log.mozilla.org'49 with open(log_path, encoding='utf8') as log_handler:50 responses.add(51 responses.GET,52 log_url,53 content_type="text/plain;charset=utf-8",54 body=log_handler.read(),55 status=200,56 )57 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)58 store_failure_lines(log_obj)59 assert FailureLine.objects.count() == 160 failure = FailureLine.objects.get(pk=1)61 assert failure.job_guid == test_job.guid62 assert failure.repository == test_repository63 assert (64 failure.test65 == u"toolkit/content/tests/widgets/test_videocontrols_video_direction.html <U+01F346>"66 )67 assert failure.subtest == u"Test timed out. <U+010081>"68 assert failure.message == u"<U+0F0151>"69 assert failure.stack.endswith("<U+0F0151>")70 assert failure.stackwalk_stdout is None71 assert failure.stackwalk_stderr is None72def test_store_error_summary_404(activate_responses, test_repository, test_job):73 log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")74 log_url = 'http://my-log.mozilla.org'75 with open(log_path) as log_handler:76 responses.add(responses.GET, log_url, body=log_handler.read(), status=404)77 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)78 store_failure_lines(log_obj)79 log_obj.refresh_from_db()80 assert log_obj.status == JobLog.FAILED81def test_store_error_summary_500(activate_responses, test_repository, test_job):82 log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")83 log_url = 'http://my-log.mozilla.org'84 with open(log_path) as log_handler:85 responses.add(responses.GET, log_url, body=log_handler.read(), status=500)86 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)87 with pytest.raises(HTTPError):88 store_failure_lines(log_obj)89 log_obj.refresh_from_db()90 assert log_obj.status == JobLog.FAILED91def test_store_error_summary_duplicate(activate_responses, test_repository, test_job):92 log_url = 'http://my-log.mozilla.org'93 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)94 write_failure_lines(95 log_obj, [{"action": "log", "level": "debug", "message": "test", "line": 1}]96 )97 write_failure_lines(98 log_obj,99 [100 {"action": "log", "level": "debug", "message": "test", "line": 1},101 {"action": "log", "level": "debug", "message": "test 1", "line": 2},102 ],103 )104 assert FailureLine.objects.count() == 2105def test_store_error_summary_group_status(activate_responses, test_repository, test_job):106 log_path = SampleData().get_log_path("mochitest-browser-chrome_errorsummary.log")107 log_url = 'http://my-log.mozilla.org'108 with open(log_path) as log_handler:109 responses.add(responses.GET, log_url, body=log_handler.read(), status=200)110 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)111 store_failure_lines(log_obj)112 assert FailureLine.objects.count() == 5113 ok_groups = Group.objects.filter(group_result__status=GroupStatus.OK)114 error_groups = Group.objects.filter(group_result__status=GroupStatus.ERROR)115 assert ok_groups.count() == 28116 assert error_groups.count() == 1117 assert log_obj.groups.count() == 29118 assert log_obj.groups.all().first().name == "dom/base/test/browser.ini"119 assert ok_groups.first().name == "dom/base/test/browser.ini"120 assert error_groups.first().name == "toolkit/components/pictureinpicture/tests/browser.ini"121def test_get_group_results(activate_responses, test_repository, test_job):122 log_path = SampleData().get_log_path("mochitest-browser-chrome_errorsummary.log")123 log_url = 'http://my-log.mozilla.org'124 with open(log_path) as log_handler:125 responses.add(responses.GET, log_url, body=log_handler.read(), status=200)126 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)127 store_failure_lines(log_obj)128 groups = get_group_results(test_job.push)129 task_groups = groups['V3SVuxO8TFy37En_6HcXLs']130 assert task_groups['dom/base/test/browser.ini']131def test_get_group_results_with_colon(activate_responses, test_repository, test_job):132 log_path = SampleData().get_log_path("xpcshell-errorsummary-with-colon.log")133 log_url = 'http://my-log.mozilla.org'134 with open(log_path) as log_handler:135 responses.add(responses.GET, log_url, body=log_handler.read(), status=200)136 log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)137 store_failure_lines(log_obj)138 groups = get_group_results(test_job.push)139 task_groups = groups['V3SVuxO8TFy37En_6HcXLs']140 assert task_groups[141 'toolkit/components/extensions/test/xpcshell/xpcshell-e10s.ini:toolkit/components/extensions/test/xpcshell/xpcshell-content.ini'142 ]143 assert task_groups['toolkit/components/places/tests/unit/xpcshell.ini']144 assert task_groups[145 'toolkit/components/extensions/test/xpcshell/xpcshell-e10s.ini:toolkit/components/extensions/test/xpcshell/xpcshell-common-e10s.ini'...
com.vmware.cis.tagging.category
Source:com.vmware.cis.tagging.category
1#!/usr/bin/env python32from pprint import pprint3import re4import requests5import yaml6import dateutil.parser7from datetime import datetime8from datetime import timezone9MAX_HOURS = 1210COMMON_PATTERNS = (11 (12 "MISSING_TAGGING_SERVICE",13 re.compile(r"Cannot find service 'com.vmware.cis.tagging.category'"),14 ),15 (16 "HOST_KAPUT",17 re.compile(18 r"'Unable to communicate with the remote host, since it is disconnected.'"19 ),20 ),21 (22 "HOST_KAPUT",23 re.compile(24 r'Cannot complete login due to an incorrect user name or password."'25 ),26 ),27)28def get_inventory(log_url):29 r = requests.get(log_url + "zuul-info/inventory.yaml")30 return yaml.safe_load(r.text)31def search_error_pattern(log_url):32 founds = {}33 failure_in_re = re.compile(34 ".*NOTICE: To resume at this test target, use the option: --start-at (\S+)"35 )36 r = requests.get(log_url + "job-output.txt")37 task_path = None38 for l in r.text.splitlines():39 if "task path:" in l:40 task_path = l.split(41 "/home/zuul/.ansible/collections/ansible_collections/community/vmware/"42 )[-1]43 for name, pattern in COMMON_PATTERNS:44 if pattern.search(l):45 if name not in founds:46 founds[name] = 047 founds["task_path"] = task_path48 founds[name] += 149 m = failure_in_re.search(l)50 if m:51 founds["task_path"] = task_path52 founds["failure_in"] = m.group(1)53 return founds54def iter_builds():55 jobs = [56 "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_1_of_2",57 "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_2_of_2",58 "ansible-test-cloud-integration-vcenter_1esxi_with_nested-python36",59 "ansible-test-cloud-integration-vcenter_2esxi_without_nested-python36",60 ]61 for job in jobs:62 url_template = "https://dashboard.zuul.ansible.com/api/tenant/ansible/builds?job_name={job}&limit=30"63 r = requests.get(url_template.format(job=job))64 builds = r.json()65 for build in builds:66 yield build67results_by_host_id = {}68results_by_region = {}69results_by_age = {}70host_by_host_id = {}71current = {}72for build in iter_builds():73 delta = datetime.now(timezone.utc) - dateutil.parser.parse(build["end_time"] + 'Z')74 age = int(delta.total_seconds() / 3600)75 if age > MAX_HOURS:76 continue77 if build["result"] in ("RETRY_LIMIT", "SKIPPED", "POST_FAILURE"):78 continue79 matches = None80 if build["result"] == "FAILURE":81 matches = search_error_pattern(build["log_url"])82 if "MISSING_TAGGING_SERVICE" not in matches:83# print("not matches for log_url: {log_url}".format(log_url=build["log_url"]))84# print(85# (86# "The failure may not be an hypervisor stability problem. We "87# "ignore it. ({failure_in} at {task_path})"88# ).format(89# failure_in=matches.get("failure_in"),90# task_path=matches.get("task_path"),91# )92# )93 continue94 print("** https://github.com/ansible-collections/vmware/pull/{change}".format(95 **build96 ))97 print("Match: {log_url}".format(log_url=build["log_url"]))98 if not build["log_url"]:99 continue100 inventory = get_inventory(build["log_url"])101 cloud = inventory["all"]["hosts"]["esxi1"]["nodepool"]["cloud"]102 region = inventory["all"]["hosts"]["esxi1"]["nodepool"]["az"]103 host_id = inventory["all"]["hosts"]["esxi1"]["nodepool"]["host_id"]104 if age not in results_by_age:105 results_by_age[age] = {"SUCCESS": [], "FAILURE": []}106 results_by_age[age][build["result"]].append(107 {"log_url": build["log_url"], "matches": matches, "region": region}108 )109 if host_id not in results_by_host_id:110 host_by_host_id[host_id] = "{cloud}-{region}-{host_id}".format(111 cloud=cloud, region=region, host_id=host_id112 )113 results_by_host_id[host_id] = {"SUCCESS": [], "FAILURE": []}114 results_by_host_id[host_id][build["result"]].append(115 {"log_url": build["log_url"], "matches": matches}116 )117 if region not in results_by_region:118 results_by_region[region] = {"SUCCESS": [], "FAILURE": []}119 results_by_region[region][build["result"]].append(120 {"log_url": build["log_url"], "matches": matches}121 )122 if build["change"] not in current:123 current[build["change"]] = {124 "build": build,125 "log_url": build["log_url"],126 "matches": matches,127 }128print("BAD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))129for host_id in results_by_host_id.keys():130 if len(results_by_host_id[host_id]["SUCCESS"]) < 2:131 # Not enough metrics132 continue133 rate = len(results_by_host_id[host_id]["SUCCESS"]) / (134 len(results_by_host_id[host_id]["SUCCESS"])135 + len(results_by_host_id[host_id]["FAILURE"])136 )137 if rate < 0.9:138 print(139 "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)140 )141print("GOOD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))142for host_id in results_by_host_id.keys():143 if len(results_by_host_id[host_id]["SUCCESS"]) < 2:144 # Not enough metrics145 continue146 rate = len(results_by_host_id[host_id]["SUCCESS"]) / (147 len(results_by_host_id[host_id]["SUCCESS"])148 + len(results_by_host_id[host_id]["FAILURE"])149 )150 if rate > 0.9:151 print(152 "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)153 )154print("BY ZONE (for last {max_hours})".format(max_hours=MAX_HOURS))155for region, values in results_by_region.items():156 rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))157 print("region={region} (rate={rate})".format(region=region, rate=rate))158print("BY AGE")159for age in sorted(results_by_age):160 values = results_by_age[age]161 rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))162 print(163 "â° {age}h ago (rate={rate})".format(164 age=age, rate=rate, failures=values["FAILURE"]165 )166 )167 for failure in values["FAILURE"]:168 print(" - {failure})".format(age=age, rate=rate, failure=failure))169for job in current.values():170 if not job["matches"]:171 continue172 if "MISSING_TAGGING_SERVICE" in job["matches"]:173 print(174 "Restart: https://github.com/ansible-collections/vmware/pull/{change}".format(175 **job["build"]176 )...
esxi_bad_compute_hosts
Source:esxi_bad_compute_hosts
1#!/usr/bin/env python32from pprint import pprint3import re4import requests5import yaml6import dateutil.parser7from datetime import datetime8from datetime import timezone9MAX_HOURS = 2410COMMON_PATTERNS = (11 (12 "HOST_KAPUT",13 re.compile(r"'An error occurred while communicating with the remote host.'"),14 ),15 (16 "HOST_KAPUT",17 re.compile(18 r"'Unable to communicate with the remote host, since it is disconnected.'"19 ),20 ),21 (22 "HOST_KAPUT",23 re.compile(24 r'Cannot complete login due to an incorrect user name or password."'25 ),26 ),27)28def get_inventory(log_url):29 r = requests.get(log_url + "zuul-info/inventory.yaml")30 return yaml.safe_load(r.text)31def search_error_pattern(log_url):32 founds = {}33 failure_in_re = re.compile(34 ".*NOTICE: To resume at this test target, use the option: --start-at (\S+)"35 )36 r = requests.get(log_url + "job-output.txt")37 task_path = None38 for l in r.text.splitlines():39 if "task path:" in l:40 task_path = l.split(41 "/home/zuul/.ansible/collections/ansible_collections/community/vmware/"42 )[-1]43 for name, pattern in COMMON_PATTERNS:44 if pattern.search(l):45 if name not in founds:46 founds[name] = 047 founds["task_path"] = task_path48 founds[name] += 149 m = failure_in_re.search(l)50 if m:51 founds["task_path"] = task_path52 founds["failure_in"] = m.group(1)53 return founds54def iter_builds():55 jobs = [56 "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_1_of_2",57 "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_2_of_2",58 "ansible-test-cloud-integration-vcenter_1esxi_with_nested-python36",59 "ansible-test-cloud-integration-vcenter_2esxi_without_nested-python36",60 ]61 for job in jobs:62 url_template = "https://dashboard.zuul.ansible.com/api/tenant/ansible/builds?job_name={job}&limit=30"63 r = requests.get(url_template.format(job=job))64 builds = r.json()65 for build in builds:66 yield build67results_by_host_id = {}68results_by_region = {}69results_by_age = {}70host_by_host_id = {}71current = {}72for build in iter_builds():73 delta = datetime.now(timezone.utc) - dateutil.parser.parse(build["end_time"] + 'Z')74 age = int(delta.total_seconds() / 3600)75 if age > MAX_HOURS:76 continue77 if build["result"] in ("RETRY_LIMIT", "SKIPPED", "POST_FAILURE"):78 continue79 matches = None80 if build["result"] == "FAILURE":81 print("** https://github.com/ansible-collections/vmware/pull/{change}".format(82 **build83 ))84 matches = search_error_pattern(build["log_url"])85 if "HOST_KAPUT" not in matches:86 print("not matches for log_url: {log_url}".format(log_url=build["log_url"]))87 print(88 (89 "The failure may not be an hypervisor stability problem. We "90 "ignore it. ({failure_in} at {task_path})"91 ).format(92 failure_in=matches.get("failure_in"),93 task_path=matches.get("task_path"),94 )95 )96 continue97 print("Match: {log_url}".format(log_url=build["log_url"]))98 if not build["log_url"]:99 continue100 inventory = get_inventory(build["log_url"])101 cloud = inventory["all"]["hosts"]["esxi1"]["nodepool"]["cloud"]102 region = inventory["all"]["hosts"]["esxi1"]["nodepool"]["az"]103 host_id = inventory["all"]["hosts"]["esxi1"]["nodepool"]["host_id"]104 if age not in results_by_age:105 results_by_age[age] = {"SUCCESS": [], "FAILURE": []}106 results_by_age[age][build["result"]].append(107 {"log_url": build["log_url"], "matches": matches, "region": region}108 )109 if host_id not in results_by_host_id:110 host_by_host_id[host_id] = "{cloud}-{region}-{host_id}".format(111 cloud=cloud, region=region, host_id=host_id112 )113 results_by_host_id[host_id] = {"SUCCESS": [], "FAILURE": []}114 results_by_host_id[host_id][build["result"]].append(115 {"log_url": build["log_url"], "matches": matches}116 )117 if region not in results_by_region:118 results_by_region[region] = {"SUCCESS": [], "FAILURE": []}119 results_by_region[region][build["result"]].append(120 {"log_url": build["log_url"], "matches": matches}121 )122 if build["change"] not in current:123 current[build["change"]] = {124 "build": build,125 "log_url": build["log_url"],126 "matches": matches,127 }128print("BAD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))129for host_id in results_by_host_id.keys():130 if len(results_by_host_id[host_id]["SUCCESS"]) < 2:131 # Not enough metrics132 continue133 rate = len(results_by_host_id[host_id]["SUCCESS"]) / (134 len(results_by_host_id[host_id]["SUCCESS"])135 + len(results_by_host_id[host_id]["FAILURE"])136 )137 if rate < 0.9:138 print(139 "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)140 )141print("GOOD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))142for host_id in results_by_host_id.keys():143 if len(results_by_host_id[host_id]["SUCCESS"]) < 2:144 # Not enough metrics145 continue146 rate = len(results_by_host_id[host_id]["SUCCESS"]) / (147 len(results_by_host_id[host_id]["SUCCESS"])148 + len(results_by_host_id[host_id]["FAILURE"])149 )150 if rate > 0.9:151 print(152 "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)153 )154print("BY ZONE (for last {max_hours})".format(max_hours=MAX_HOURS))155for region, values in results_by_region.items():156 rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))157 print("region={region} (rate={rate})".format(region=region, rate=rate))158print("BY AGE")159for age in sorted(results_by_age):160 values = results_by_age[age]161 rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))162 print(163 "â° {age}h ago (rate={rate})".format(164 age=age, rate=rate, failures=values["FAILURE"]165 )166 )167 for failure in values["FAILURE"]:168 print(" - {failure})".format(age=age, rate=rate, failure=failure))169for job in current.values():170 if not job["matches"]:171 continue172 if "HOST_KAPUT" in job["matches"]:173 print(174 "Restart: https://github.com/ansible-collections/vmware/pull/{change}".format(175 **job["build"]176 )...
slack_callback_functions_with_partial.py
Source:slack_callback_functions_with_partial.py
1from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator2from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook3from airflow.hooks.base import BaseHook4from airflow.operators.python import get_current_context5import traceback6"""7Follow Option #2 outlined here https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd1551058in order to set up Slack HTTP webhook9"""10def dag_triggered_callback(context, **kwargs):11 slack_conn_id = kwargs["http_conn_id"]12 slack_webhook_token = BaseHook.get_connection(slack_conn_id).password13 log_url = context.get("task_instance").log_url14 slack_msg = f"""15 :airflow-new: DAG has been triggered. 16 *Task*: {context.get('task_instance').task_id} 17 *DAG*: {context.get('task_instance').dag_id} 18 *Execution Time*: {context.get('execution_date')} 19 <{log_url}| *Log URL*>20 """21 slack_alert = SlackWebhookOperator(22 task_id="slack_test",23 http_conn_id=slack_conn_id,24 webhook_token=slack_webhook_token,25 message=slack_msg,26 username="airflow",27 )28 return slack_alert.execute(context=context)29def dag_success_callback(context, **kwargs):30 slack_conn_id = kwargs["http_conn_id"]31 slack_webhook_token = BaseHook.get_connection(slack_conn_id).password32 log_url = context.get("task_instance").log_url33 slack_msg = f"""34 :airflow-new: DAG has succeeded. 35 *Task*: {context.get('task_instance').task_id} 36 *DAG*: {context.get('task_instance').dag_id} 37 *Execution Time*: {context.get('execution_date')} 38 <{log_url}| *Log URL*>39 """40 slack_alert = SlackWebhookOperator(41 task_id="slack_test",42 http_conn_id=slack_conn_id,43 webhook_token=slack_webhook_token,44 message=slack_msg,45 username="airflow",46 )47 return slack_alert.execute(context=context)48def success_callback(context, **kwargs):49 slack_conn_id = kwargs["http_conn_id"]50 slack_webhook_token = BaseHook.get_connection(slack_conn_id).password51 log_url = context.get("task_instance").log_url52 slack_msg = f"""53 :white_check_mark: Task has succeeded. 54 *Task*: {context.get('task_instance').task_id} 55 *DAG*: {context.get('task_instance').dag_id} 56 *Execution Time*: {context.get('execution_date')} 57 <{log_url}| *Log URL*>58 """59 slack_alert = SlackWebhookOperator(60 task_id="slack_test",61 http_conn_id=slack_conn_id,62 webhook_token=slack_webhook_token,63 message=slack_msg,64 username="airflow",65 )66 return slack_alert.execute(context=context)67def failure_callback(context, **kwargs):68 slack_conn_id = kwargs["http_conn_id"]69 slack_webhook_token = BaseHook.get_connection(slack_conn_id).password70 log_url = context.get("task_instance").log_url71 exception = context.get('exception')72 formatted_exception = ''.join(73 traceback.format_exception(etype=type(exception),74 value=exception,75 tb=exception.__traceback__)76 ).strip()77 slack_msg = f"""78 :x: Task has failed. 79 *Task*: {context.get('task_instance').task_id}80 *DAG*: {context.get('task_instance').dag_id} 81 *Execution Time*: {context.get('execution_date')} 82 *Exception*: {formatted_exception}83 <{log_url}| *Log URL*>84 """85 slack_alert = SlackWebhookOperator(86 task_id="slack_test",87 http_conn_id=slack_conn_id,88 webhook_token=slack_webhook_token,89 message=slack_msg,90 username="airflow",91 )92 return slack_alert.execute(context=context)93def retry_callback(context, **kwargs):94 slack_conn_id = kwargs["http_conn_id"]95 slack_webhook_token = BaseHook.get_connection(slack_conn_id).password96 log_url = context.get("task_instance").log_url97 exception = context.get('exception')98 formatted_exception = ''.join(99 traceback.format_exception(etype=type(exception),100 value=exception,101 tb=exception.__traceback__)102 ).strip()103 slack_msg = f"""104 :sos: Task is retrying.105 *Task*: {context.get('task_instance').task_id}106 *Try number:* {context.get('task_instance').try_number - 1} out of {context.get('task_instance').max_tries + 1}. 107 *DAG*: {context.get('task_instance').dag_id}108 *Execution Time*: {context.get('execution_date')}109 *Exception*: {formatted_exception}110 <{log_url}| *Log URL*>111 """112 slack_alert = SlackWebhookOperator(113 task_id="slack_test",114 http_conn_id=slack_conn_id,115 webhook_token=slack_webhook_token,116 message=slack_msg,117 username="airflow",118 )119 return slack_alert.execute(context=context)120def slack_test(**kwargs):121 context = get_current_context()122 slack_conn_id = kwargs["http_conn_id"]123 slack_webhook_token = BaseHook.get_connection(slack_conn_id).password124 log_url = context.get("task_instance").log_url125 slack_msg = f"""126 :airflow-spin-new: This is a test for sending a slack message via a PythonOperator.127 *Task*: {context.get('task_instance').task_id}128 *DAG*: {context.get('task_instance').dag_id}129 *Execution Time*: {context.get('execution_date')}130 <{log_url}| *Log URL*>131 """132 slack_alert = SlackWebhookOperator(133 task_id="slack_test",134 http_conn_id=slack_conn_id,135 webhook_token=slack_webhook_token,136 message=slack_msg,137 username="airflow",138 )139 return slack_alert.execute(context=context)140def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis, *args, **kwargs):141 dag_id = slas[0].dag_id142 task_id = slas[0].task_id143 execution_date = slas[0].execution_date.isoformat()144 http_conn_id = kwargs["http_conn_id"]145 hook = SlackWebhookHook(146 http_conn_id=http_conn_id,147 webhook_token=BaseHook.get_connection(http_conn_id).password,148 message=f"""149 :sos: *SLA has been missed*150 *Task:* {task_id}151 *DAG:* {dag_id}152 *Execution Date:* {execution_date}153 """,154 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!