Best Python code snippet using localstack_python
conftest.py
Source:conftest.py
...21 "Arista-7060CX-32S-C32",22 "Arista-7060CX-32S-Q32",23 "Arista-7060CX-32S-D48C8"24]25def _parse_timestamp(timestamp):26 try:27 time = datetime.strptime(timestamp, FMT)28 except ValueError:29 time = datetime.strptime(timestamp, FMT_SHORT)30 return time31@pytest.fixture(autouse=True, scope="module")32def skip_on_simx(duthosts, rand_one_dut_hostname):33 duthost = duthosts[rand_one_dut_hostname]34 platform = duthost.facts["platform"]35 if "simx" in platform:36 pytest.skip('skipped on this platform: {}'.format(platform))37@pytest.fixture(scope="module")38def xcvr_skip_list(duthosts):39 intf_skip_list = {}40 for dut in duthosts:41 platform = dut.facts['platform']42 hwsku = dut.facts['hwsku']43 f_path = os.path.join('/usr/share/sonic/device', platform, hwsku, 'hwsku.json')44 intf_skip_list[dut.hostname] = []45 dut.has_sku = True46 try:47 out = dut.command("cat {}".format(f_path))48 hwsku_info = json.loads(out["stdout"])49 for int_n in hwsku_info['interfaces']:50 if hwsku_info['interfaces'][int_n].get('port_type') == "RJ45":51 intf_skip_list[dut.hostname].append(int_n)52 except Exception:53 # hwsku.json does not exist will return empty skip list54 dut.has_sku = False55 logging.debug(56 "hwsku.json absent or port_type for interfaces not included for hwsku {}".format(hwsku))57 return intf_skip_list58@pytest.fixture()59def bring_up_dut_interfaces(request, duthosts, enum_rand_one_per_hwsku_frontend_hostname, tbinfo):60 """61 Bring up outer interfaces on the DUT.62 Args:63 request: pytest request object64 duthost: Fixture for interacting with the DUT.65 """66 duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname]67 yield68 if request.node.rep_call.failed:69 mg_facts = duthost.get_extended_minigraph_facts(tbinfo)70 ports = mg_facts['minigraph_ports'].keys()71 # Enable outer interfaces72 for port in ports:73 duthost.no_shutdown(ifname=port)74def get_state_times(timestamp, state, state_times, first_after_offset=None):75 time = timestamp.strftime(FMT)76 state_name = state.split("|")[0].strip()77 state_status = state.split("|")[1].strip()78 state_dict = state_times.get(state_name, {"timestamp": {}})79 timestamps = state_dict.get("timestamp")80 if state_status in timestamps:81 state_dict[state_status+" count"] = state_dict.get(state_status+" count") + 182 # capture last occcurence - useful in calculating events end time83 state_dict["last_occurence"] = time84 elif first_after_offset:85 state_dict[state_status+" count"] = 186 # capture the first occurence as the one after offset timestamp and ignore the ones before87 # this is useful to find time after a specific instance, for eg. - kexec time or FDB disable time.88 if _parse_timestamp(first_after_offset) < _parse_timestamp(time):89 timestamps[state_status] = time90 else:91 # only capture timestamp of first occurence of the entity. Otherwise, just increment the count above.92 # this is useful in capturing start point. Eg., first neighbor entry, LAG ready, etc.93 state_dict[state_status+" count"] = 194 timestamps[state_status] = time95 return {state_name: state_dict}96def get_report_summary(analyze_result, reboot_type):97 time_spans = analyze_result.get("time_span", {})98 time_spans_summary = OrderedDict()99 kexec_offsets = analyze_result.get("offset_from_kexec", {})100 reboot_start_time = analyze_result.get("reboot_time", {}).get("timestamp", {}).get("Start")101 kexec_offsets_summary = OrderedDict()102 for entity in OFFSET_ITEMS:103 time_taken = ""104 if entity in kexec_offsets:105 time_taken = kexec_offsets.get(entity).get("time_taken", "")106 elif entity in time_spans:107 timestamp = time_spans.get(entity).get("timestamp", {})108 marker_start_time = timestamp.get("Start") if "Start" in timestamp else timestamp.get("Started")109 if reboot_start_time and reboot_start_time != "N/A" and marker_start_time:110 time_taken = (_parse_timestamp(marker_start_time) -111 _parse_timestamp(reboot_start_time)).total_seconds()112 kexec_offsets_summary.update({entity.lower(): str(time_taken)})113 for entity in TIME_SPAN_ITEMS:114 time_taken = ""115 if entity in time_spans:116 time_taken = time_spans.get(entity,{}).get("time_span", "")117 elif entity in kexec_offsets:118 marker_first_time = kexec_offsets.get(entity).get("timestamp", {}).get("Start")119 marker_last_time = kexec_offsets.get(entity).get("last_occurence")120 if marker_first_time and marker_last_time:121 time_taken = (_parse_timestamp(marker_last_time) -122 _parse_timestamp(marker_first_time)).total_seconds()123 time_spans_summary.update({entity.lower(): str(time_taken)})124 lacp_sessions_dict = analyze_result.get("controlplane")125 lacp_sessions_waittime = lacp_sessions_dict.pop("lacp_sessions")\126 if lacp_sessions_dict and "lacp_sessions" in lacp_sessions_dict else None127 controlplane_summary = {"downtime": "", "arp_ping": "", "lacp_session_max_wait": ""}128 if lacp_sessions_waittime and len(lacp_sessions_waittime) > 0:129 max_lacp_session_wait = max(list(lacp_sessions_waittime.values()))130 analyze_result.get(\131 "controlplane", controlplane_summary).update(132 {"lacp_session_max_wait": max_lacp_session_wait})133 result_summary = {134 "reboot_type": reboot_type,135 "dataplane": analyze_result.get("dataplane", {"downtime": "", "lost_packets": ""}),136 "controlplane": analyze_result.get("controlplane", controlplane_summary),137 "time_span": time_spans_summary,138 "offset_from_kexec": kexec_offsets_summary139 }140 return result_summary141def get_kexec_time(duthost, messages, result):142 reboot_pattern = re.compile(r'.* NOTICE admin: Rebooting with /sbin/kexec -e to.*...')143 reboot_time = "N/A"144 logging.info("FINDING REBOOT PATTERN")145 for message in messages:146 # Get timestamp of reboot - Rebooting string147 if re.search(reboot_pattern, message):148 logging.info("FOUND REBOOT PATTERN for {}".format(duthost.hostname))149 delim = "{}|{}".format(duthost.hostname, "sonic")150 reboot_time = _parse_timestamp(re.split(delim, message)[0].strip()).strftime(FMT)151 continue152 result["reboot_time"] = {153 "timestamp": {"Start": reboot_time},154 }155def analyze_log_file(duthost, messages, result, offset_from_kexec):156 service_restart_times = dict()157 derived_patterns = OTHER_PATTERNS.get("COMMON")158 service_patterns = dict()159 # get platform specific regexes160 if is_broadcom_device(duthost):161 derived_patterns.update(OTHER_PATTERNS.get("BRCM"))162 elif is_mellanox_device(duthost):163 derived_patterns.update(OTHER_PATTERNS.get("MLNX"))164 # get image specific regexes165 if "20191130" in duthost.os_version:166 derived_patterns.update(OTHER_PATTERNS.get("201911"))167 service_patterns.update(SERVICE_PATTERNS.get("201911"))168 else:169 derived_patterns.update(OTHER_PATTERNS.get("LATEST"))170 service_patterns.update(SERVICE_PATTERNS.get("LATEST"))171 if not messages:172 logging.error("Expected messages not found in syslog")173 return None174 def service_time_check(message, status):175 delim = "{}|{}".format(duthost.hostname, "sonic")176 time = _parse_timestamp(re.split(delim, message)[0].strip())177 time = time.strftime(FMT)178 service_name = message.split(status + " ")[1].split()[0]179 service_name = service_name.upper()180 if service_name == "ROUTER":181 service_name = "RADV"182 service_dict = service_restart_times.get(service_name, {"timestamp": {}})183 timestamps = service_dict.get("timestamp")184 if status in timestamps:185 service_dict[status+" count"] = service_dict.get(status+" count") + 1186 else:187 service_dict[status+" count"] = 1188 timestamps[status] = time189 service_restart_times.update({service_name: service_dict})190 reboot_time = "N/A"191 for message in messages:192 # Get stopping to started timestamps for services (swss, bgp, etc)193 for status, pattern in service_patterns.items():194 if re.search(pattern, message):195 service_time_check(message, status)196 break197 # Get timestamps of all other entities198 for state, pattern in derived_patterns.items():199 if re.search(pattern, message):200 delim = "{}|{}".format(duthost.hostname, "sonic")201 timestamp = _parse_timestamp(re.split(delim, message)[0].strip())202 state_name = state.split("|")[0].strip()203 if state_name + "|End" not in derived_patterns.keys():204 if "FDB_EVENT_OTHER_MAC_EXPIRY" in state_name or "FDB_EVENT_SCAPY_MAC_EXPIRY" in state_name:205 fdb_aging_disable_start = service_restart_times.get("FDB_AGING_DISABLE", {})\206 .get("timestamp", {}).get("Start")207 if not fdb_aging_disable_start:208 break209 first_after_offset = fdb_aging_disable_start210 else:211 first_after_offset = result.get("reboot_time", {}).get("timestamp", {}).get("Start")212 state_times = get_state_times(timestamp, state, offset_from_kexec, first_after_offset=first_after_offset)213 offset_from_kexec.update(state_times)214 else:215 state_times = get_state_times(timestamp, state, service_restart_times)216 service_restart_times.update(state_times)217 break218 # Calculate time that services took to stop/start219 for _, timings in service_restart_times.items():220 timestamps = timings["timestamp"]221 timings["stop_time"] = (_parse_timestamp(timestamps["Stopped"]) - \222 _parse_timestamp(timestamps["Stopping"])).total_seconds() \223 if "Stopped" in timestamps and "Stopping" in timestamps else None224 timings["start_time"] = (_parse_timestamp(timestamps["Started"]) -\225 _parse_timestamp(timestamps["Starting"])).total_seconds() \226 if "Started" in timestamps and "Starting" in timestamps else None227 if "Started" in timestamps and "Stopped" in timestamps:228 timings["time_span"] = (_parse_timestamp(timestamps["Started"]) -\229 _parse_timestamp(timestamps["Stopped"])).total_seconds()230 elif "Start" in timestamps and "End" in timestamps:231 if "last_occurence" in timings:232 timings["time_span"] = (_parse_timestamp(timings["last_occurence"]) -\233 _parse_timestamp(timestamps["Start"])).total_seconds()234 else:235 timings["time_span"] = (_parse_timestamp(timestamps["End"]) -\236 _parse_timestamp(timestamps["Start"])).total_seconds()237 result["time_span"].update(service_restart_times)238 result["offset_from_kexec"] = offset_from_kexec239 return result240def analyze_sairedis_rec(messages, result, offset_from_kexec):241 sai_redis_state_times = dict()242 for message in messages:243 for state, pattern in SAIREDIS_PATTERNS.items():244 if re.search(pattern, message):245 timestamp = datetime.strptime(message.split("|")[0].strip(), "%Y-%m-%d.%H:%M:%S.%f")246 state_name = state.split("|")[0].strip()247 reboot_time = result.get("reboot_time", {}).get("timestamp", {}).get("Start")248 if state_name + "|End" not in SAIREDIS_PATTERNS.keys():249 if "FDB_EVENT_OTHER_MAC_EXPIRY" in state_name or "FDB_EVENT_SCAPY_MAC_EXPIRY" in state_name:250 fdb_aging_disable_start = result.get("time_span", {}).get("FDB_AGING_DISABLE", {})\251 .get("timestamp", {}).get("Start")252 if not fdb_aging_disable_start:253 break254 first_after_offset = fdb_aging_disable_start255 else:256 first_after_offset = result.get("reboot_time", {}).get("timestamp", {}).get("Start")257 state_times = get_state_times(timestamp, state, offset_from_kexec, first_after_offset=first_after_offset)258 offset_from_kexec.update(state_times)259 else:260 state_times = get_state_times(timestamp, state, sai_redis_state_times, first_after_offset=reboot_time)261 sai_redis_state_times.update(state_times)262 for _, timings in sai_redis_state_times.items():263 timestamps = timings["timestamp"]264 if "Start" in timestamps and "End" in timestamps:265 timings["time_span"] = (_parse_timestamp(timestamps["End"]) -\266 _parse_timestamp(timestamps["Start"])).total_seconds()267 result["time_span"].update(sai_redis_state_times)268 result["offset_from_kexec"] = offset_from_kexec269def get_data_plane_report(analyze_result, reboot_type, log_dir, reboot_oper):270 report = {"controlplane": {"arp_ping": "", "downtime": ""}, "dataplane": {"lost_packets": "", "downtime": ""}}271 reboot_report_path = re.sub('([\[\]])','[\\1]',log_dir) # escaping, as glob utility does not work well with "[","]"272 if reboot_oper:273 reboot_report_file_name = "{}-reboot-{}-report.json".format(reboot_type, reboot_oper)274 else:275 reboot_report_file_name = "{}-reboot-report.json".format(reboot_type)276 reboot_report_file = "{}/{}".format(reboot_report_path, reboot_report_file_name)277 files = glob.glob(reboot_report_file)278 if files:279 filepath = files[0]280 with open(filepath) as json_file:281 report = json.load(json_file)282 analyze_result.update(report)283def verify_mac_jumping(test_name, timing_data, verification_errors):284 mac_jumping_other_addr = timing_data.get("offset_from_kexec", {})\285 .get("FDB_EVENT_OTHER_MAC_EXPIRY",{}).get("Start count", 0)286 mac_jumping_scapy_addr = timing_data.get("offset_from_kexec", {})\287 .get("FDB_EVENT_SCAPY_MAC_EXPIRY",{}).get("Start count", 0)288 mac_expiry_start = timing_data.get("offset_from_kexec", {}).get("FDB_EVENT_OTHER_MAC_EXPIRY",{})\289 .get("timestamp", {}).get("Start")290 fdb_aging_disable_start = timing_data.get("time_span", {}).get("FDB_AGING_DISABLE",{})\291 .get("timestamp", {}).get("Start")292 fdb_aging_disable_end = timing_data.get("time_span", {}).get("FDB_AGING_DISABLE",{})\293 .get("timestamp", {}).get("End")294 if "mac_jump" in test_name:295 # MAC jumping allowed - allow Scapy default MAC to jump296 logging.info("MAC jumping is allowed. Jump count for expected mac: {}, unexpected MAC: {}"\297 .format(mac_jumping_scapy_addr, mac_jumping_other_addr))298 if not mac_jumping_scapy_addr:299 verification_errors.append("MAC jumping not detected when expected for address: 00-06-07-08-09-0A")300 else:301 # MAC jumping not allowed - do not allow the SCAPY default MAC to jump302 if mac_jumping_scapy_addr:303 verification_errors.append("MAC jumping is not allowed. Jump count for scapy mac: {}, other MAC: {}"\304 .format(mac_jumping_scapy_addr, mac_jumping_other_addr))305 if mac_jumping_other_addr:306 # In both mac jump allowed and denied cases unexpected MAC addresses should NOT jump between307 # the window that starts when SAI is instructed to disable MAC learning (warmboot shutdown path)308 # and ends when SAI is instructed to enable MAC learning (warmboot recovery path)309 logging.info("Mac expiry for unexpected addresses started at {}".format(mac_expiry_start) +\310 " and FDB learning enabled at {}".format(fdb_aging_disable_end))311 if _parse_timestamp(mac_expiry_start) > _parse_timestamp(fdb_aging_disable_start) and\312 _parse_timestamp(mac_expiry_start) < _parse_timestamp(fdb_aging_disable_end):313 verification_errors.append("Mac expiry detected during the window when FDB ageing was disabled")314def verify_required_events(duthost, event_counters, timing_data, verification_errors):315 for key in ["time_span", "offset_from_kexec"]:316 for pattern in REQUIRED_PATTERNS.get(key):317 observed_start_count = timing_data.get(key).get(pattern).get("Start count")318 observed_end_count = timing_data.get(key).get(pattern).get("End count")319 expected_count = event_counters.get(pattern)320 if observed_start_count != expected_count:321 verification_errors.append("FAIL: Event {} was found {} times, when expected exactly {} times".\322 format(pattern, observed_start_count, expected_count))323 if key == "time_span" and observed_start_count != observed_end_count:324 verification_errors.append("FAIL: Event {} counters did not match. ".format(pattern) +\325 "Started {} times, and ended {} times".format(observed_start_count, observed_end_count))326def overwrite_script_to_backup_logs(duthost, reboot_type, bgpd_log):327 # find the fast/warm-reboot script path328 reboot_script_path = duthost.shell('which {}'.format("{}-reboot".format(reboot_type)))['stdout']329 # backup original script330 duthost.shell("cp {} {}".format(reboot_script_path, reboot_script_path + ".orig"))331 # find the anchor string inside fast/warm-reboot script332 rebooting_log_line = "debug.*Rebooting with.*to.*"333 # Create a backup log command to be inserted right after the anchor string defined above334 backup_log_cmds ="cp /var/log/syslog /host/syslog.99;" +\335 "cp /var/log/swss/sairedis.rec /host/sairedis.rec.99;" +\336 "cp /var/log/swss/swss.rec /host/swss.rec.99;" +\337 "cp {} /host/bgpd.log.99".format(bgpd_log)338 # Do find-and-replace on fast/warm-reboot script to insert the backup_log_cmds string339 insert_backup_command = "sed -i '/{}/a {}' {}".format(rebooting_log_line, backup_log_cmds, reboot_script_path)340 duthost.shell(insert_backup_command)341def get_current_sonic_version(duthost):342 return duthost.shell('sonic_installer list | grep Current | cut -f2 -d " "')['stdout']343@pytest.fixture()344def advanceboot_loganalyzer(duthosts, rand_one_dut_hostname, request):345 """346 Advance reboot log analysis.347 This fixture starts log analysis at the beginning of the test. At the end,348 the collected expect messages are verified and timing of start/stop is calculated.349 Args:350 duthosts : List of DUT hosts351 rand_one_dut_hostname: hostname of a randomly selected DUT352 """353 duthost = duthosts[rand_one_dut_hostname]354 test_name = request.node.name355 if "warm" in test_name:356 reboot_type = "warm"357 elif "fast" in test_name:358 reboot_type = "fast"359 else:360 reboot_type = "unknown"361 # Currently, advanced reboot test would skip for kvm platform if the test has no device_type marker for vs.362 # Doing the same skip logic in this fixture to avoid running loganalyzer without the test executed363 if duthost.facts['platform'] == 'x86_64-kvm_x86_64-r0':364 device_marks = [arg for mark in request.node.iter_markers(name='device_type') for arg in mark.args]365 if 'vs' not in device_marks:366 pytest.skip('Testcase not supported for kvm')367 hwsku = duthost.facts["hwsku"]368 log_filesystem = duthost.shell("df -h | grep '/var/log'")['stdout']369 logs_in_tmpfs = True if log_filesystem and "tmpfs" in log_filesystem else False370 loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix="test_advanced_reboot_{}".format(test_name))371 def bgpd_log_handler(preboot=False):372 # check current OS version post-reboot. This can be different than preboot OS version in case of upgrade373 current_os_version = get_current_sonic_version(duthost)374 if preboot:375 if 'SONiC-OS-201811' in current_os_version:376 bgpd_log = "/var/log/quagga/bgpd.log"377 else:378 bgpd_log = "/var/log/frr/bgpd.log"379 additional_files={'/var/log/swss/sairedis.rec': '', bgpd_log: ''}380 loganalyzer.additional_files = list(additional_files.keys())381 loganalyzer.additional_start_str = list(additional_files.values())382 return bgpd_log383 else:384 # log_analyzer may start with quagga and end with frr, and frr.log might still have old logs.385 # To avoid missing preboot log, or analyzing old logs, combine quagga and frr log into new file386 duthost.shell("cat {} {} | sort -n > {}".format(387 "/var/log/quagga/bgpd.log", "/var/log/frr/bgpd.log", "/var/log/bgpd.log"), module_ignore_errors=True)388 loganalyzer.additional_files = ['/var/log/swss/sairedis.rec', '/var/log/bgpd.log']389 def pre_reboot_analysis():390 bgpd_log = bgpd_log_handler(preboot=True)391 if hwsku in SMALL_DISK_SKUS or logs_in_tmpfs:392 # For small disk devices, /var/log in mounted in tmpfs.393 # Hence, after reboot the preboot logs are lost.394 # For log_analyzer to work, it needs logs from the shutdown path395 # Below method inserts a step in reboot script to back up logs to /host/396 overwrite_script_to_backup_logs(duthost, reboot_type, bgpd_log)397 marker = loganalyzer.init()398 loganalyzer.load_common_config()399 ignore_file = os.path.join(TEMPLATES_DIR, "ignore_boot_messages")400 expect_file = os.path.join(TEMPLATES_DIR, "expect_boot_messages")401 ignore_reg_exp = loganalyzer.parse_regexp_file(src=ignore_file)402 expect_reg_exp = loganalyzer.parse_regexp_file(src=expect_file)403 loganalyzer.ignore_regex.extend(ignore_reg_exp)404 loganalyzer.expect_regex = []405 loganalyzer.expect_regex.extend(expect_reg_exp)406 loganalyzer.match_regex = []407 return marker408 def post_reboot_analysis(marker, event_counters=None, reboot_oper=None, log_dir=None):409 bgpd_log_handler()410 if hwsku in SMALL_DISK_SKUS or logs_in_tmpfs:411 restore_backup = "mv /host/syslog.99 /var/log/; " +\412 "mv /host/sairedis.rec.99 /var/log/swss/; " +\413 "mv /host/swss.rec.99 /var/log/swss/; " +\414 "mv /host/bgpd.log.99 /var/log/"415 duthost.shell(restore_backup, module_ignore_errors=True)416 # find the fast/warm-reboot script path417 reboot_script_path = duthost.shell('which {}'.format("{}-reboot".format(reboot_type)))['stdout']418 # restore original script. If the ".orig" file does not exist (upgrade path case), ignore the error.419 duthost.shell("mv {} {}".format(reboot_script_path + ".orig", reboot_script_path), module_ignore_errors=True)420 result = loganalyzer.analyze(marker, fail=False)421 analyze_result = {"time_span": dict(), "offset_from_kexec": dict()}422 offset_from_kexec = dict()423 for key, messages in result["expect_messages"].items():424 if "syslog" in key:425 get_kexec_time(duthost, messages, analyze_result)426 reboot_start_time = analyze_result.get("reboot_time", {}).get("timestamp", {}).get("Start")427 if not reboot_start_time or reboot_start_time == "N/A":428 logging.error("kexec regex \"Rebooting with /sbin/kexec\" not found in syslog. " +\429 "Skipping log_analyzer checks..")430 return431 analyze_log_file(duthost, messages, analyze_result, offset_from_kexec)432 elif "bgpd.log" in key:433 analyze_log_file(duthost, messages, analyze_result, offset_from_kexec)434 elif "sairedis.rec" in key:435 analyze_sairedis_rec(messages, analyze_result, offset_from_kexec)436 for marker, time_data in analyze_result["offset_from_kexec"].items():437 marker_start_time = time_data.get("timestamp", {}).get("Start")438 reboot_start_time = analyze_result.get("reboot_time", {}).get("timestamp", {}).get("Start")439 if reboot_start_time and reboot_start_time != "N/A" and marker_start_time:440 time_data["time_taken"] = (_parse_timestamp(marker_start_time) -\441 _parse_timestamp(reboot_start_time)).total_seconds()442 else:443 time_data["time_taken"] = "N/A"444 get_data_plane_report(analyze_result, reboot_type, log_dir, reboot_oper)445 result_summary = get_report_summary(analyze_result, reboot_type)446 logging.info(json.dumps(analyze_result, indent=4))447 logging.info(json.dumps(result_summary, indent=4))448 if reboot_oper and not isinstance(reboot_oper, str):449 reboot_oper = type(reboot_oper).__name__450 if reboot_oper:451 report_file_name = request.node.name + "_" + reboot_oper + "_report.json"452 summary_file_name = request.node.name + "_" + reboot_oper + "_summary.json"453 else:454 report_file_name = request.node.name + "_report.json"455 summary_file_name = request.node.name + "_summary.json"...
test_core.py
Source:test_core.py
...28 'session': {'application': {'applicationId': 1}}}))29 def test_alexa_request_parsing(self):30 ask = Ask()31 ask._alexa_request()32 def test_parse_timestamp(self):33 utc = build_utcoffset('UTC', timedelta(hours=0))34 result = Ask._parse_timestamp('2017-07-08T07:38:00Z')35 self.assertEqual(datetime(2017, 7, 8, 7, 38, 0, 0, utc), result)36 result = Ask._parse_timestamp(1234567890)37 self.assertEqual(datetime(2009, 2, 13, 23, 31, 30), result)38 with self.assertRaises(ValueError):39 Ask._parse_timestamp(None)40 def test_tries_parsing_on_valueerror(self):41 max_timestamp = 25340230080042 # should cause a ValueError normally43 with self.assertRaises(ValueError):44 datetime.utcfromtimestamp(max_timestamp)45 # should safely parse, assuming scale change needed46 # note: this assert looks odd, but Py2 handles the parsing47 # differently, resulting in a differing timestamp48 # due to more granularity of microseconds49 result = Ask._parse_timestamp(max_timestamp)50 self.assertEqual(datetime(1978, 1, 11, 21, 31, 40).timetuple()[0:6],51 result.timetuple()[0:6])52 with self.assertRaises(ValueError):53 # still raise an error if too large54 Ask._parse_timestamp(max_timestamp * 1000)55 def tearDown(self):56 self.patch_current_app.stop()57 self.patch_load_cert.stop()...
test_refresh_config.py
Source:test_refresh_config.py
...19from pendulum.parsing import ParserError20from airflow.kubernetes.refresh_config import _parse_timestamp21class TestRefreshKubeConfigLoader(TestCase):22 def test_parse_timestamp_should_convert_z_timezone_to_unix_timestamp(self):23 ts = _parse_timestamp("2020-01-13T13:42:20Z")24 assert 1578922940 == ts25 def test_parse_timestamp_should_convert_regular_timezone_to_unix_timestamp(self):26 ts = _parse_timestamp("2020-01-13T13:42:20+0600")27 assert 1578922940 == ts28 def test_parse_timestamp_should_throw_exception(self):29 with pytest.raises(ParserError):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!