Best Python code snippet using freezegun
xtime.py
Source:xtime.py
1#!/usr/bin/python32# Copyright (C) 2022 Vino Fernando Crescini <vfcrescini@gmail.com>3# SPDX-License-Identifier: GPL-3.0-or-later4# 2021-11-19 xtime.py5# convenience library to provide a unified time API on both micropython and cpython6# micropython localtime() uses custom tzdata file7# tzdata file line format: <seconds since embedded epoch> <utc offset in seconds>8import sys as _sys9import time as _time10import re as _re11_DEFAULT_TZ_FILE_PATH = "tzdata.txt"12_LINUX = _sys.platform.lower().startswith("linux")13_EPOCH_OFFSET = 94668480014class XTime(object):15 def __new__(cls, *args, **kwargs):16 xt = None17 if cls == XTime:18 # we're instantiating XTime, so return an instance of one of the subclasses instead19 if hasattr(_sys, "implementation") and _sys.implementation.name == "micropython":20 xt = _XTimeMicroPython(*args, **kwargs)21 else:22 xt = _XTimeCPython(*args, **kwargs)23 else:24 # we're instantiating one of the subclasses, so just do what __new__() normally does25 xt = object.__new__(cls)26 return xt27 def __init__(self, *args, **kwargs):28 pass29 def sleep_ns(self, nsecs):30 raise NotImplementedError31 def sleep_us(self, usecs):32 raise NotImplementedError33 def sleep_ms(self, msecs):34 raise NotImplementedError35 def sleep_ss(self, ssecs):36 raise NotImplementedError37 def time_ns(self):38 raise NotImplementedError39 def time_us(self):40 raise NotImplementedError41 def time_ms(self):42 raise NotImplementedError43 def time_ss(self):44 raise NotImplementedError45 def localtime(self, ssecs=None):46 raise NotImplementedError47 def mktime(self, ttuple):48 raise NotImplementedError49 def gmtime(self, ssecs=None):50 raise NotImplementedError51class _XTimeCPython(object):52 def __init__(self):53 pass54 def sleep_ns(self, nsecs):55 raise NotImplementedError56 def sleep_us(self, usecs):57 self.sleep_ms(usecs / 1000.0)58 def sleep_ms(self, msecs):59 self.sleep_ss(msecs / 1000.0)60 def sleep_ss(self, ssecs):61 _time.sleep(ssecs)62 def time_ns(self):63 return int((_time.time() - _EPOCH_OFFSET) * 1000000000)64 def time_us(self):65 return self.time_ns() // 100066 def time_ms(self):67 return self.time_us() // 100068 def time_ss(self):69 return int(_time.time() - _EPOCH_OFFSET)70 def localtime(self, ssecs=None):71 if ssecs == None:72 ssecs = self.time_ss()73 t = _time.localtime(ssecs + _EPOCH_OFFSET)74 return t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, t.tm_wday, t.tm_yday75 def mktime(self, ttuple):76 return int(_time.mktime(ttuple + (-1,))) - _EPOCH_OFFSET77 def gmtime(self, ssecs=None):78 if ssecs == None:79 ssecs = self.time_ss()80 t = _time.gmtime(ssecs + _EPOCH_OFFSET)81 return t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, t.tm_wday, t.tm_yday82class _XTimeMicroPython(object):83 _tz_line_pattern = _re.compile("^\s*([0-9]+)\s+([+-]?[0-9]+)\s*$")84 def __init__(self, tz_path=_DEFAULT_TZ_FILE_PATH):85 self._tz_path = tz_path86 self._tz_offset = 087 self._tz_expiry = 088 def _update(self, ssecs):89 # if no tzdata file was set, do nothing90 if self._tz_path == None:91 return92 # open the tzdata file93 # let it raise an exception on error94 95 f = open(self._tz_path, "r")96 97 # read each line, ignore everything that is not something space something98 # get only what is relevant, namely, the last line whose timestamp is less than now, and the line after that99 100 prev = None101 curr = None102 103 for line in f:104 105 mo = _XTimeMicroPython._tz_line_pattern.match(line)106 107 if mo == None:108 continue109 110 ttstamp = 0111 toffset = 0112 113 try:114 ttstamp = int(mo.group(1))115 toffset = int(mo.group(2))116 except:117 continue118 119 prev = curr120 curr = ttstamp, toffset121 122 if ssecs < ttstamp:123 break124 125 # we don't need the file anymore126 127 f.close()128 129 # now update the globals130 131 self._tz_offset = 0132 self._tz_expiry = 0133 134 # the expiry of an offset is the in-effect timestamp of the next offset135 136 if prev != None:137 self._tz_offset = prev[1]138 139 if curr != None:140 141 # last one may not actually be expired yet142 143 if ssecs >= curr[0]:144 self._tz_offset = curr[1]145 else:146 self._tz_expiry = curr[0]147 def sleep_ns(self, nsecs):148 raise NotImplementedError149 def sleep_us(self, usecs):150 _time.sleep_us(usecs)151 def sleep_ms(self, msecs):152 _time.sleep_ms(msecs)153 def sleep_ss(self, ssecs):154 _time.sleep(ssecs)155 def time_ns(self):156 t = _time.time_ns()157 if _LINUX:158 t = t - (_EPOCH_OFFSET * 1000000000)159 return t160 def time_us(self):161 return self.time_ns() // 1000162 def time_ms(self):163 return self.time_us() // 1000164 def time_ss(self):165 t = int(_time.time())166 if _LINUX:167 t = t - _EPOCH_OFFSET168 return t169 def localtime(self, ssecs=None):170 if ssecs == None:171 ssecs = self.time_ss()172 # if the current tzdata offset is expired, try to update it173 if self._tz_expiry <= ssecs:174 self._update(ssecs)175 if _LINUX:176 ssecs = ssecs + _EPOCH_OFFSET177 t = _time.gmtime(ssecs + self._tz_offset)178 return t[:8]179 def mktime(self, ttuple):180 t = 0181 if _LINUX:182 t = _time.mktime(ttuple + (-1,)) - _EPOCH_OFFSET183 else:184 t = _time.mktime(ttuple)185 return t186 def gmtime(self, ssecs=None):187 if ssecs == None:188 ssecs = self.time_ss()189 if _LINUX:190 ssecs = ssecs + _EPOCH_OFFSET191 t = _time.gmtime(ssecs)...
logger.py
Source:logger.py
1# encoding=utf-82import datetime3import logging4import os5import re6import socket7import sys8import time9from timerotaingfilehandler import MultiProcessSafeHandler10here = os.path.abspath(os.path.dirname(__file__))11def tz_fix():12 # calculate TZ offset for isotime13 tz = re.compile(r'([+-]\d{2})(\d{2})$').match(time.strftime('%z'))14 if time.timezone and tz:15 opsym = '+' if time.timezone > 0 else '-'16 offset_hrs, offset_min = tz.groups()17 _tz_offset = "{0}{1}:{2}".format(opsym, offset_hrs, offset_min)18 else:19 # time.timezone == 0 => we're in UTC20 _tz_offset = "Z"21 return _tz_offset22class LogFormatter(logging.Formatter):23 """24 log formatter to add isotime, hostname25 """26 def __init__(self, service_name, *args, **kwargs):27 self._service_name = service_name28 self._tz_offset = tz_fix()29 # get hostname and save for later30 try:31 # force short-name32 self._hostname = socket.gethostname().split(".")[0]33 except Exception:34 self._hostname = "-"35 super(LogFormatter, self).__init__(*args, **kwargs)36 def format(self, record):37 """38 Add special content to record dict39 """40 record.hostname = self._hostname41 record.isotime = datetime.datetime.fromtimestamp(42 record.created).isoformat() + self._tz_offset43 record.service_name = self._service_name44 return logging.Formatter.format(self, record)45class RFC5424LogFormatter(LogFormatter):46 """47 formatter for rfc5424 messaging48 """49 RFC5424_LOG_FORMAT = (50 "%(isotime)s %(hostname)s %(process)s %(service_name)s\n"51 "%(levelname)s %(name)s:%(filename)s:%(lineno)d\n"52 "Message: %(message)s\n"53 )54 RFC5424_TIME_FORMAT = None55 def __init__(self, service_name):56 LogFormatter.__init__(57 self, service_name,58 fmt=self.RFC5424_LOG_FORMAT,59 datefmt=self.RFC5424_TIME_FORMAT)60def log_init(name, debug=False, log_path=None):61 """62 setup root logger63 debug: log message with level DEBUG or higher,64 add stdout handler to print log to screen65 """66 rfc5424_formatter = RFC5424LogFormatter(name)67 agent_logger = logging.getLogger()68 agent_logger.handlers = []69 if debug:70 stdout_handler = logging.StreamHandler(sys.stdout)71 stdout_handler.setFormatter(rfc5424_formatter)72 agent_logger.addHandler(stdout_handler)73 agent_logger.setLevel(logging.DEBUG)74 else:75 agent_logger.setLevel(logging.INFO)76 log_file = (log_path if log_path else here) + "/info.log"77 file_log_handler = MultiProcessSafeHandler(log_file, when="midnight")78 file_log_handler.setFormatter(rfc5424_formatter)79 agent_logger.addHandler(file_log_handler)80 log_file = (log_path if log_path else here) + "/error.log"81 file_log_handler_err = MultiProcessSafeHandler(log_file, when="midnight")82 file_log_handler_err.setFormatter(rfc5424_formatter)83 file_log_handler_err.setLevel(logging.ERROR)...
base_collector.py
Source:base_collector.py
1import abc2from datetime import datetime, timedelta3import pandas as pd4from pytz.reference import LocalTimezone5class BaseCollector(abc.ABC):6 _verbose: bool7 _tz_offset: timedelta8 def __init__(self, verbose=False) -> None:9 self._verbose = verbose10 self._tz_offset = timedelta(seconds=LocalTimezone().utcoffset(datetime.now()).seconds)11 @abc.abstractmethod12 def _collect(self, data: pd.DataFrame) -> None: # pragma: no cover13 pass14 def collect(self, data: pd.DataFrame) -> None:15 dttm = datetime.now()16 self._collect(data)17 if self._verbose: # pragma: no cover...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!