Best Python code snippet using Kiwi_python
timeline.py
Source:timeline.py
1import json2from datetime import datetime, date3from typing import Union4import pandas as pd5from src.compute.changelogs import work_activity_on_interval6from src.compute.utils import Interval7from src.db.utils import SnowflakeWrapper8def build_issue_timelines(sw: SnowflakeWrapper, interval: Interval, keys: Union[None, list] = None) -> pd.DataFrame:9 def filter_log_items(items: dict, prop: str) -> Union[dict, None]:10 filtered = [x for x in items["changelogItems"] if x["field"] == prop]11 if len(filtered) > 1:12 raise Exception(f"More than 1 filtered item of type {prop}: {filtered}")13 elif len(filtered) == 1:14 return filtered[0]15 else:16 return None17 changelogs = work_activity_on_interval(sw, interval, keys)18 timelines = []19 for _, row in changelogs.iterrows():20 timeline = []21 date_issue_created = row["DATECREATED"]22 status_from, status_to, changed_assignee = "BACKLOG", "BACKLOG", False # default status at the beginning (top of workflow)23 assign_from, assign_to, changed_status = None, None, False24 last_change = date_issue_created25 for logs in row["CHANGELOGITEMS"]:26 date_created = logs["dateCreated"]27 change_assignee = filter_log_items(logs, "assignee")28 if change_assignee is not None:29 assign_from = None if "from" not in change_assignee else change_assignee["from"]30 assign_to = None if "to" not in change_assignee else change_assignee["to"] # the latest assignee31 # print(f"[{author}]: [{status_to}] Reassign from `{assign_from}`, to `{assign_to}`")32 changed_assignee = True33 change_status = filter_log_items(logs, "status")34 if change_status is not None:35 status_from = None if "fromString" not in change_status else change_status["fromString"]36 status_to = None if "toString" not in change_status else change_status["toString"] # the latest status37 # print(f"[{author}]: [{assign_to}] Transition from `{status_from}`, to `{status_to}`")38 changed_status = True39 if last_change > date_created:40 raise Exception("WAIT A SEC")41 if changed_status and changed_assignee: # doesn't work if there are 2+ changes in a small delta time42 timeline.append({43 "status": status_from,44 "assignee": assign_from,45 "date_from": last_change,46 "date_to": date_created,47 "tdelta": date_created - last_change48 })49 changed_status = False50 changed_assignee = False51 if changed_status:52 timeline.append({53 "status": status_from,54 "assignee": assign_to,55 "date_from": last_change,56 "date_to": date_created,57 "tdelta": date_created - last_change58 })59 changed_status = False60 if changed_assignee:61 timeline.append({62 "status": status_to,63 "assignee": assign_from,64 "date_from": last_change,65 "date_to": date_created,66 "tdelta": date_created - last_change67 })68 changed_assignee = False69 last_change = date_created70 timeline.append({71 "status": status_to, # last known status72 "assignee": assign_to, # last known assignee73 "date_to": None, # it's still ongoing74 "date_from": last_change,75 "tdelta": Interval.to_datetime(interval.toDate(raw=True)) - last_change76 })77 timelines.append(json.dumps(timeline, default=Interval.isDate))78 copy = changelogs.copy()79 copy = copy.drop(["CHANGELOGITEMS"], axis=1)80 # print(timelines)81 copy["timelines"] = timelines82 return copy83def persist_issue_timelines(sw: SnowflakeWrapper, interval: Interval, keys: Union[None, list] = None) -> None:84 try:85 timelines = build_issue_timelines(sw, interval, keys)86 SnowflakeWrapper.execute_df_query(timelines, "timelines_temp", ifexists='replace')87 sw.execute(88 f"CREATE OR REPLACE TABLE TIMELINES ( "89 f" KEY VARCHAR(32), "90 f" STATUS VARCHAR(256), "91 f" ASSIGNEE VARCHAR(128), "92 f" DATEFROM TIMESTAMP_NTZ(9), "93 f" DATETO TIMESTAMP_NTZ(9), "94 f" TIMEDELTA NUMBER(38,0) "95 f"); "96 )97 sw.execute(98 f"INSERT ALL INTO TIMELINES "99 f" SELECT "100 f" c.KEY key, "101 f" t.VALUE:status::string status, "102 f" t.VALUE:assignee::string assignee, "103 f" TO_TIMESTAMP_NTZ(t.VALUE:date_from) datefrom, "104 f" TO_TIMESTAMP_NTZ(t.VALUE:date_to) dateto, "105 f" t.VALUE:tdelta::NUMBER timedelta "106 f" FROM "107 f" timelines_temp c, "108 f" LATERAL FLATTEN(PARSE_JSON(TIMELINES)) t; "109 )110 except Exception as e:111 print(f"Failed persisting timlines to Snowflake: {e}")112 raise e113def get_avg_timeline(sw: SnowflakeWrapper, interval: Interval) -> pd.DataFrame:114 query = (115 f'SELECT '116 f' STATUS "Status", '117 f' COUNT(DISTINCT KEY) "UniqueIssues", '118 f' COUNT(*) "Issues", '119 f' "Issues" - "UniqueIssues" "Reassignments", '120 f' AVG(TIMEDELTA) / (60 * 60 * 24) "AvgDays", '121 f' MAX(TIMEDELTA) / (60 * 60 * 24) "MaxDays", '122 f' MIN(TIMEDELTA) / (60 * 60 * 24) "MinDays" '123 f'FROM TIMELINES t '124 f'WHERE '125 f' t.DATEFROM >= {interval.fromDate()} '126 f' AND t.DATETO < {interval.toDate()} '127 f'GROUP BY 1 '128 f'ORDER BY 1, 4 DESC; '129 )130 print(query)131 return sw.fetch_df(query)132if __name__ == '__main__':133 with SnowflakeWrapper.create_snowflake_connection() as connection:134 sw = SnowflakeWrapper(connection)135 # timelines = build_issue_timelines(sw, Interval(date(2019, 7, 1), date(2020, 1, 1)))136 # print(timelines)137 # SnowflakeWrapper.execute_df_query(timelines, "timelines", ifexists='replace')...
task.py
Source:task.py
...74 raise Exception("failed to fetch tasks!")75 notes = response.json()['Data']['Notes']76 tasks.extend(notes)77 return tasks78def change_assignee(tasks, config, manager_id, driver):79 """80 docstring81 """82 print(f"Now @ change_assignee() using Id: {manager_id}")83 headers = {84 "Referer": "https://baaa.certification.systems/",85 "X-Requested-With": "XMLHttpRequest",86 "Accept": "application/json, text/javascript, */*; q=0.01",87 "Accept-Encoding": "gzip, deflate, br",88 "Accept-Language": "en-US,en;q=0.9",89 "Content-Type": "application/json; charset=UTF-8",90 "Host": "baaa.certification.systems",91 "Origin": "https://baaa.certification.systems",92 "Sec-Fetch-Dest": "empty",93 "Sec-Fetch-Mode": "cors",94 "Sec-Fetch-Site": "same-origin"95 }96 data = {"userid": manager_id}97 for task in tasks:98 url = f"https://baaa.certification.systems/Reminder/Note/{task['Id']}/Assignee/"99 print(f"Url: {url}")100 response = driver.request(101 'POST', url, data=json.dumps(data), headers=headers)102 print(f"Response status code: {response.status_code}")103 if not response.status_code == 200:104 print(f"Error updating assginee for task {task['Id']}")105 print(response.text)106 continue107 print(f"Successfully updated assginee for task {task['Id']}")108def read_managers():109 """110 docstring111 """112 with open("managers.json", 'rt') as fp:113 managers = json.loads(fp.read())114 print(f"Loaded ({len(managers)}) managers")115 return managers116def main():117 """118 docstring119 """120 parser = argparse.ArgumentParser()121 parser.add_argument("--input")122 parser.add_argument("--manager", default="Chris Franklin")123 options = parser.parse_args()124 config = read_config()125 managers = read_managers()126 if not config['manager'] in managers:127 print(f"Error! Can't lookup ID of Manager <{config['manager']}>")128 return129 manager_id = managers[config['manager']]130 driver = Chrome()131 driver.get(config['site'])132 login(config, driver)133 tasks = download_tasks(config, driver)134 if tasks and len(tasks) > 0:135 change_assignee(tasks, config, manager_id, driver)136if __name__ == "__main__":...
parenting.py
Source:parenting.py
1from collections import defaultdict2t = int(input())3def change_assignee(current):4 return "C" if current == "J" else "J"5def solve():6 num_tasks = int(input())7 intervals = []8 assignee = "C"9 for i in range(num_tasks):10 interval_str = input()11 interval = interval_str.split(" ")12 interval.append(i) # append order of the task13 intervals.append(interval)14 intervals.sort(key=lambda i: int(i[0]))15 lookup = defaultdict(list)16 ans = [""] * len(intervals)17 for j in intervals:18 if not lookup:19 lookup[assignee] = j20 ans[j[-1]] = assignee21 else:22 start = int(j[0])23 if start >= int(lookup[assignee][1]):24 lookup[assignee] = j25 ans[j[-1]] = assignee26 else:27 assignee = change_assignee(assignee)28 # check if collision with other person29 if not lookup[assignee] or start >= int(lookup[assignee][1]):30 lookup[assignee] = j31 ans[j[-1]] = assignee32 else:33 return "IMPOSSIBLE"34 return "".join(ans)35for c in range(t):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!