Best Python code snippet using toolium_python
dataset.py
Source:dataset.py
...213 :return: tuple with replaced value and boolean to know if replacement has been done214 """215 def _date_matcher():216 return re.match(r'\[(NOW(?:\((?:.*)\)|)|TODAY)(?:\s*([\+|-]\s*\d+)\s*(\w+)\s*)?\]', param)217 def _offset_datetime(amount, units):218 now = datetime.datetime.utcnow()219 if not amount or not units:220 return now221 the_amount = int(amount.replace(' ',''))222 the_units = units.lower()223 return now + datetime.timedelta(**dict([(the_units, the_amount)]))224 def _is_only_date(base):225 return 'TODAY' in base226 def _default_format(base):227 date_format = '%d/%m/%Y' if language == 'es' else '%Y/%m/%d'228 if _is_only_date(base):229 return date_format230 return f'{date_format} %H:%M:%S'231 def _get_format(base):232 format_matcher = re.match(r'.*\((.*)\).*', base)233 if format_matcher and len(format_matcher.groups()) == 1:234 return format_matcher.group(1)235 return _default_format(base)236 matcher = _date_matcher()237 if not matcher:238 return param, False239 base, amount, units = list(matcher.groups())240 format_str = _get_format(base)241 date = _offset_datetime(amount, units)242 return date.strftime(format_str), True243def _replace_param_fixed_length(param):244 """245 Generate a fixed length data element if param matches the expression [<type>_WITH_LENGTH_<length>]246 where <type> can be: STRING, INTEGER, STRING_ARRAY, INTEGER_ARRAY, JSON.247 E.g. [STRING_WITH_LENGTH_15]248 :param param: parameter value249 :return: tuple with replaced value and boolean to know if replacement has been done250 """251 new_param = param252 param_replaced = False253 if param.startswith('[') and param.endswith(']'):254 if any(x in param for x in ['STRING_ARRAY_WITH_LENGTH_', 'INTEGER_ARRAY_WITH_LENGTH_']):255 seeds = {'STRING': 'a', 'INTEGER': 1}...
time_sync.py
Source:time_sync.py
1import argparse2import concurrent.futures3import datetime4import json5import time6import requests7import urllib38urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)9requests.packages.urllib3.disable_warnings()10class timesync:11 @property12 def time_delta(self):13 if self.task_future.running():14 pass15 # print("task is running")16 elif (datetime.datetime.utcnow() - self.last_sync).total_seconds() > self.refresh:17 self.task_future = self.pool.submit(self.sync_time)18 return self.offset19 def fetch_sample(self):20 REQUEST_URL = "%s://%s:%s/%s/_search" % (21 self.proto,22 self.elastichost,23 self.elasticport,24 self.index,25 )26 HEADERS = {"Accept": "application/json"}27 try:28 resp = requests.get(29 REQUEST_URL, headers=HEADERS, data=json.dumps(self.cpu_query), timeout=3.030 )31 resp.close()32 response = json.loads(resp.text)33 except Exception as e:34 return e35 return response36 def sync_time(self):37 time_db = []38 t_now = datetime.datetime.utcnow()39 def time_elapse():40 diff = datetime.datetime.utcnow() - t_now41 if diff.total_seconds() > self.elapse_time:42 return True43 return None44 # iterate while the db is less than samples and a new document is found in elapse_time.45 # exit if db has enough samples or time to find new document expires46 while len(time_db) < (self.sample_size + 2) and not time_elapse():47 if self.die:48 return None49 time.sleep(1)50 _data = self.fetch_sample()51 if isinstance(_data, dict):52 for hit in _data["hits"]["hits"]:53 _time_set = {54 "doc_id": hit["_id"],55 "doc_time": hit["_source"]["@timestamp"],56 "time_stamp": datetime.datetime.utcnow().isoformat()[:-3] + "Z",57 }58 update_db = None59 # update the db if this is the first record60 if len(time_db) == 0:61 update_db = True62 # if this is a new document found in the query not in the db63 elif time_db[-1]["doc_id"] != hit["_id"]:64 update_db = True65 if update_db:66 _d_time = datetime.datetime.strptime(67 _time_set["doc_time"], "%Y-%m-%dT%H:%M:%S.%fZ"68 )69 _c_time = datetime.datetime.strptime(70 _time_set["time_stamp"], "%Y-%m-%dT%H:%M:%S.%fZ"71 )72 delta = {"delta": _d_time - _c_time}73 _time_set.update(delta)74 time_db.append(_time_set)75 # reset the wait time76 t_now = datetime.datetime.utcnow()77 if self.verbose:78 print(_time_set)79 # process average and confidence if there's real samples than the invalid ones80 if len(time_db) > 2:81 # function is used to normalize the timedelta string with a simple negative string value82 # for consistent strptime parsing (ex -00:10:00)83 def timedelta_to_string(timedelta):84 total_seconds = timedelta.total_seconds()85 if total_seconds < 0:86 timedelta_str = "-" + str(datetime.timedelta() - timedelta)87 else:88 timedelta_str = str(timedelta)89 return timedelta_str90 sample_db = []91 # trim the first 2 inaccurate samples92 while len(time_db) > 2:93 sample_db.append(time_db.pop())94 # add all samples then divide by number of samples for avg95 self.offset = sum([doc["delta"] for doc in sample_db], datetime.timedelta()) / len(96 sample_db97 )98 # timedelta -> datetime - > datetime (minutes and seconds)99 _offset_datetime = datetime.datetime.strptime(100 timedelta_to_string(self.offset).strip("-"), "%H:%M:%S.%f"101 )102 _offset_ms = datetime.datetime.strptime(103 "%s.%s" % (_offset_datetime.minute, _offset_datetime.second), "%M.%S"104 )105 _percent = []106 for doc in sample_db:107 # timedelta -> datetime - > datetime (minutes and seconds)108 _delta_datetime = datetime.datetime.strptime(109 timedelta_to_string(doc["delta"]).strip("-"), "%H:%M:%S.%f"110 )111 _delta_ms = datetime.datetime.strptime(112 "%s.%s" % (_delta_datetime.minute, _delta_datetime.second), "%M.%S"113 )114 # (avg offset - sample delta) -> positive -> seconds -> percent of a minute -> percentage %115 _percent.append(int((abs(_offset_ms - _delta_ms).total_seconds() / 60) * 100))116 _confidence_compare = 100 - int(sum(_percent) / len(sample_db))117 _confidence_samples = 100 - int((len(sample_db) / self.sample_size) * 100)118 # sameples is not 0 then take 10th of number as percentage loss to the compare percentage119 self.confidence = (120 _confidence_compare121 if _confidence_samples == 0122 else _confidence_compare - (int(_confidence_samples / 10))123 )124 if self.verbose:125 print(126 "percents",127 _percent,128 "confidence_compare",129 _confidence_compare,130 "confidence_samples",131 _confidence_samples,132 )133 self.last_sync = datetime.datetime.utcnow()134 if self.verbose:135 for doc in sample_db:136 print(doc["delta"])137 print("confidence", self.confidence)138 if self.verbose:139 print("average", self.offset)140 print(self.last_sync)141 def __init__(self, **kwargs):142 self.cpu_query = {143 "size": 1,144 "sort": {"@timestamp": "desc"},145 "query": {146 "bool": {147 "must": [148 {"match_phrase": {"metricset.name": {"query": "cpu"}}},149 {"match_phrase": {"beat.name": {"query": None}}},150 ]151 }152 },153 }154 self.index = "log-systeminfo-*"155 self.elastichost = None156 self.elasticport = "9200"157 self.proto = "http"158 self.offset = datetime.timedelta(seconds=0)159 self.confidence = None160 self.sample_size = 4161 self.elapse_time = 60162 self.die = None163 self.refresh = 200 # 86400164 self.last_sync = datetime.datetime.utcnow() - datetime.timedelta(seconds=self.refresh + 1)165 self.verbose = None166 self.pool = concurrent.futures.ThreadPoolExecutor()167 for key, value in kwargs.items():168 if ("beat" in key) and (value):169 self.cpu_query["query"]["bool"]["must"][-1]["match_phrase"]["beat.name"][170 "query"171 ] = value172 if ("elastichost" in key) and (value):173 self.elastichost = value174 if ("elasticport" in key) and (value):175 self.elasticport = value176 if ("verbose" in key) and (value):177 self.verbose = True178 if ("sample_size" in key) and (value):179 self.sample_size = value180 if ("elapse_time" in key) and (value):181 self.elapse_time = value182 if ("refresh" in key) and (value):183 self.refresh = value184 self.task_future = self.pool.submit(self.sync_time)185 def __del__(self):186 self.die = True187 self.pool.shutdown(wait=True)188 def time_close(self):189 self.die = True190 self.pool.shutdown(wait=True)191def main():192 parser = argparse.ArgumentParser(description="Time sync offset generator for inSITE probes")193 parser.add_argument(194 "-H", "--host", metavar="", required=False, help="Elasticsearch host to query against"195 )196 parser.add_argument("-P", "--port", metavar="", required=False, help="Elasticsearch port")197 parser.add_argument("-S", "--sample", metavar="", required=False, help="Sample size")198 parser.add_argument(199 "-T", "--elapse_time", metavar="", required=False, help="Time to timeout betwen samples"200 )201 parser.add_argument(202 "-B", "--beat", metavar="", required=True, help="Beatname to test data against"203 )204 args = parser.parse_args()205 sync = timesync(206 beat=args.beat,207 elastichost=args.host,208 elasticport=args.port,209 sample_size=args.sample,210 elapse_time=args.elapse_time,211 verbose=True,212 )213 inputQuit = False214 while inputQuit is not "q":215 print(sync.time_delta)216 inputQuit = input("\nType q to quit or just hit enter: ")217 sync.time_close()218if __name__ == "__main__":...
dates.py
Source:dates.py
...28 )29 for index in date_time_fields:30 if r[index]:31 r[index] = salesforce_from_datetime(32 _offset_datetime(33 target_anchor,34 current_anchor,35 datetime_from_salesforce(r[index]),36 )37 )38 return r39# The Salesforce API returns datetimes with millisecond resolution, but milliseconds40# are always zero (that is, .000). Python does parse this with strptime.41# Python renders datetimes into ISO8601 with microsecond resolution (.123456),42# which Salesforce won't accept - we need exactly three digits, although they are43# currently ignored. Python also right-truncates to `.0`, which Salesforce won't take.44# Hence this clumsy workaround.45def datetime_from_salesforce(d):46 """Create a Python datetime from a Salesforce-style ISO8601 string"""47 return datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%f%z")48def salesforce_from_datetime(d):49 """Create a Salesforce-style ISO8601 string from a Python datetime"""50 return d.strftime("%Y-%m-%dT%H:%M:%S.{}+0000").format(51 str(d.microsecond)[:3].ljust(3, "0")52 )53# Python 3.6 doesn't support the fromisoformat() method.54# These functions are explicit and work on all supported versions.55def date_to_iso(d):56 """Convert date object to ISO8601 string"""57 return d.strftime("%Y-%m-%d")58def iso_to_date(s):59 """Convert ISO8601 string to date object"""60 if isinstance(s, date):61 return s62 return datetime.strptime(s, "%Y-%m-%d").date()63def _offset_date(target_anchor, current_anchor, this_date):64 """Adjust this_date to be relative to target_anchor instead of current_anchor"""65 return target_anchor + (this_date - current_anchor)66def _offset_datetime(target_anchor, current_anchor, this_datetime):67 """Adjust this_datetime to be relative to target_anchor instead of current_anchor"""68 return datetime.combine(69 _offset_date(target_anchor, current_anchor, this_datetime.date()),70 this_datetime.time(),...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!