Best Python code snippet using localstack_python
CombineData.py
Source:CombineData.py
1# æ¬èæ¬æ¯è¿è¡æ°æ®æåä¸ç»æåçæ ¸å¿é¨å2from AccessData import read_json_to_ram, write_list_to_csv, write_ram_to_json, read_transport_node_bus_csv3import time456def calculate_routes_per_stop(): # NaN -> .json & dict{list} | 计ç®æ¯ä¸ªstopå
·ä½æåªäºrouteï¼è¿ååå
¸çé®æ¯stop codeï¼å¼æ¯è¯¥è½¦ç«æ¿è½½çæå¡list7 routes = read_json_to_ram('routes')8 stop_services = {} # ç¨æ¥åæ¾æ¯ä¸ªstopæ¿è½½é9 for i in routes: # è¿éçiå·²ç»æ¯ä¸ä¸ªå®æ´çserviceåå
¸äº10 key_list = stop_services.keys() # åå¨stopæ¿è½½éåå
¸çææstopå¼11 this_bus_stop = i['BusStopCode']1213 if this_bus_stop in key_list:14 stop_services[f'{this_bus_stop}'].append(i['ServiceNo'])15 elif this_bus_stop not in key_list:16 stop_services[f'{this_bus_stop}'] = [i['ServiceNo']]17 write_ram_to_json(stop_services, 'raw', 'stops_ServicesNo')18 return stop_services192021def assign_stop_capacity(stop_services): # dict{list} -> .json & .csv | æ¥ä¸æ¥æä¸é¢ç®åºçæ¯ä¸ªç«ç¹å¯¹äºçº¿è·¯æ°çæ¿è½½éæ°æ®é¾æ¥åstops22 stops = read_json_to_ram('stops')23 for i in stops:24 this_bus_stop = i['BusStopCode'] # æ¿åºå½å访é®å°çdictçstop code25 i['Capacity'] = len(stop_services[f'{this_bus_stop}'])26 write_ram_to_json(stops, 'display_dynamic', 'stops_capacity')27 write_list_to_csv(stops, 'stops_capacity')282930def get_single_service_volume(service_code): # string -> float | æ ¹æ®service codeè·åè¿ä¸ªåä¸serviceå¨å½åçæ¯å°æ¶è½¦æ°31 services = read_json_to_ram('services')32 time_now = time.strftime("%H, %M", time.localtime()).split(",") # 计ç®ç°å¨æ¯ä»ä¹å车ç¶æ33 minute = int(time_now[0]) * 60 + int(time_now[1])34 if 390 <= minute <= 510: # 0630 - 083035 freq = 'AM_Peak_Freq'36 elif 510 < minute < 1020: # 0831 - 165937 freq = 'AM_Offpeak_Freq'38 elif 1020 <= minute <= 1140: # 1700 = 190039 freq = 'PM_Peak_Freq'40 else: # else41 freq = 'PM_Offpeak_Freq'4243 result = 0 # å¦æ没æè¿è¾è½¦çserviceä¿¡æ¯ï¼è¯´æè¿æé´å®å车æ°æ¯044 for i in services:45 if i.get('ServiceNo') == service_code:46 freq_str = i.get(freq)47 if freq_str == '-' or freq_str == '00-00' or freq_str == '0-0':48 result = 049 else:50 freq_str = freq_str.split('-')51 if len(freq_str) == 1: # å¦æè¿ä¸ªé´éä¸æ¯èå´èæ¯ç¡®å®å¼52 result = float(freq_str[0])53 else:54 result = round(60/((int(freq_str[1]) + int(freq_str[0]))/2), 2) # å
计ç®è¿ä¸æ¶æå¹³åå车é´éï¼å计ç®1hå
车æ°ï¼ç¶åå两ä½å°æ°55 break56 else:57 continue58 # print(result)59 return result606162def get_service_volume(): # NaN -> .json & dict | æ¥è¯¢æ¯ä¸ªserviceçvolumeï¼è¾åºä¸ä¸ªåå
¸ï¼åå°è°ç¨è®¡ç®realtime volumeç次æ°63 stop_services = read_json_to_ram('stops_ServicesNo')64 stop_services_keys = stop_services.keys()65 entire_service_list = [] # ä»è¿éå¼å§å
ç¨å表æä¸éå¤çç«ç¹ååºæ¥66 for key in stop_services_keys:67 this_stop = stop_services[f'{key}']68 for service_code in this_stop:69 if service_code in entire_service_list:70 continue71 else:72 entire_service_list.append(service_code)7374 entire_service_volume = {} # ç¨åå
¸åå¨service code以åå
¶å¯¹åºçvolume75 for service_code in entire_service_list:76 entire_service_volume[f'{service_code}'] = get_single_service_volume(service_code)7778 write_ram_to_json(entire_service_volume, 'raw', 'all_service_volume')79 return entire_service_volume808182def calculate_bus_volume_per_hour_per_stop(): # NaN -> list[dict] | 计ç®æ¯ä¸ªè½¦ç«ç车æµéï¼è¿å被èµå¼åç车ç«è¡¨83 stop_ServicesNo = calculate_routes_per_stop()84 stops = read_json_to_ram('stops')85 service_volume = get_service_volume()86 # print(service_volume)87 for stop in stops:88 service_list = stop_ServicesNo.get(stop['BusStopCode']) # ååºäºservicesNoéé¢è®°å½äºè¯¥å
¬äº¤ç«æ¿è½½çº¿è·¯çé®å¼å¯¹89 bus_volume = 090 for service_code in service_list:91 bus_volume += service_volume[f'{service_code}']92 stop['BusVolume'] = round(bus_volume, 2)9394 write_ram_to_json(stops, 'display_dynamic', 'stops_bus_volume')95 return stops969798def calculate_human_volume_alltime(): # NaN -> .json | ç»æå人æµé表ï¼çæ两个å
å«äºå
¨é¨æ¶é´æ
åµç表99 weekday, holiday = read_transport_node_bus_csv('transport_node_bus_202108')100 for table in [weekday, holiday]:101 human_volume_by_stop = {}102 table_type = table[0][1]103 for row in table: # ååºææçç«ç¹ç¼å·ï¼å¹¶ä¸å»é104 existing_stops = human_volume_by_stop.keys()105 hour = row[2]106 stop_code = row[4]107 volume = int(row[5]) + int(row[6])108 if stop_code not in existing_stops:109 human_volume_by_stop[f'{stop_code}'] = {f'{hour}': f'{volume}'}110 elif stop_code in existing_stops:111 human_volume_by_stop[f'{stop_code}'][f'{hour}'] = f'{volume}'112113 if table_type == 'WEEKDAY':114 filename = 'human_volume_weekday'115 elif table_type == 'WEEKENDS/HOLIDAY':116 filename = 'human_volume_holiday'117 write_ram_to_json(human_volume_by_stop, 'raw', filename)118119120def get_current_human_volume(): # NaN -> .json | ç´æ¥ä»æ¬å°æ件è·åæ°æ®ï¼è¯»åå½å车ç«äººæµéï¼èµå¼å车ç«121 calculate_human_volume_alltime()122 assign_coords_to_human_volume_alltime()123124 time_now = time.strftime("%a,%H", time.localtime()).split(",") # å
ætime以å读åªå¼ 表ç»å¤çäº125 hour = time_now[1]126 if time_now[0] == 'Sat' or time_now[0] == 'Sun':127 volume_alltime = read_json_to_ram('human_volume_holiday', 'raw')128 else:129 volume_alltime = read_json_to_ram('human_volume_weekday', 'raw')130 # print(volume_alltime)131 stops = read_json_to_ram('stops')132 item_id = -1133 for stop in stops:134 item_id += 1135 stop_code = stop['BusStopCode']136 try:137 stop['HumanVolume'] = int(volume_alltime[f'{stop_code}'][f'{hour}'])138 except:139 stop['HumanVolume'] = 0140141 write_ram_to_json(stops, 'display_dynamic', 'stops_human_volume')142143144def calculate_bus_human_balance(): # NaN -> .json | 计ç®è½¦-人平衡ååï¼è¿ä¸ªbalanceå¼è¶é«è¯´æ人类ç»åºçååè¶å¤§145 bus_volume = read_json_to_ram('stops_bus_volume', 'display_dynamic')146 human_volume = read_json_to_ram('stops_human_volume', 'display_dynamic')147 stops = read_json_to_ram('stops')148 index = 0149 for stop in stops:150 try:151 balance = human_volume[index]['HumanVolume'] / bus_volume[index]['BusVolume']152 except:153 balance = 0154 stop['Balance'] = round(balance, 2)155 index += 1156 write_ram_to_json(stops, 'display_dynamic', 'stops_balance')157158159def assign_coords_to_human_volume_alltime(): # NaN -> .json | æä¹åç人æµéæ¿åºæ¥èµä¸ªåæ ï¼ä¹åå¿è®°æäº160 empty_stop = {} # å
å建ä¸ä¸ªç©ºç«å°çåå
¸ï¼åé¢èµå¼ç»æ¥ä¸å°çstopCodeï¼è¿æ ·åé¢js读åæ°æ®çæ¶åææç«å°çåå
¸æ ¼å¼å°±æ¯ç»ä¸çï¼ä¸åå¨ç©ºå¼161 for i in range(24):162 empty_stop[f'{i}'] = 0163164 stops = read_json_to_ram('stops')165 for i in ['holiday', 'weekday']:166 volume = read_json_to_ram(f'human_volume_{i}')167 for stop in stops:168 stop_code = stop['BusStopCode']169 try:170 stop['VolumeAlltime'] = volume[f'{stop_code}']171 except:172 stop['VolumeAlltime'] = empty_stop173 write_ram_to_json(stops, 'display_static', f'human_volume_{i}_coords')174175176if __name__ == "__main__":177 assign_coords_to_human_volume_alltime()
...
run_test_service_helper.py
Source:run_test_service_helper.py
...9 if monkeypatch:10 monkeypatch.setattr(logging.root, "handlers", [])11 loop = asyncio.get_event_loop()12 service: Optional[ServiceContainer] = None13 def stop_services(loop: Any = None) -> None:14 if not loop:15 loop = asyncio.get_event_loop()16 asyncio.ensure_future(_stop_services())17 def force_stop_services(loop: Any = None) -> None:18 if not loop:19 loop = asyncio.get_event_loop()20 asyncio.ensure_future(_force_stop_services())21 async def _stop_services() -> None:22 if service:23 service.stop_service()24 async def _force_stop_services() -> None:25 if service and not service.started_waiter.done():26 service.started_waiter.set_result([])27 for signame in ("SIGINT", "SIGTERM"):28 loop.add_signal_handler(getattr(signal, signame), functools.partial(stop_services, loop))29 try:30 service = ServiceContainer(ServiceImporter.import_service_file(filename))31 assert service is not None32 async def _async() -> None:33 loop = asyncio.get_event_loop()34 try:35 await service.run_until_complete()36 except Exception:37 loop = asyncio.get_event_loop()38 stop_services(loop)39 force_stop_services(loop)40 raise41 future = asyncio.ensure_future(_async())42 if wait:43 loop.run_until_complete(asyncio.wait([service.started_waiter]))44 except Exception:45 for signame in ("SIGINT", "SIGTERM"):46 loop.remove_signal_handler(getattr(signal, signame))47 loop = asyncio.get_event_loop()48 for signame in ("SIGINT", "SIGTERM"):49 loop.remove_signal_handler(getattr(signal, signame))50 if service:51 stop_services(loop)52 raise53 services = {}54 if wait:55 for service_name, instance, log_level in service.started_waiter.result():56 services[service_name] = instance57 else:58 def get_services() -> Dict:59 loop.run_until_complete(asyncio.wait([service.started_waiter]))60 return {service_name: instance for service_name, instance, log_level in service.started_waiter.result()}61 return get_services, future...
csk-stop-services
Source:csk-stop-services
1#!/usr/bin/env python2#########################################################################3# Copyright 2011 Cloud Sidekick4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#########################################################################17# This command does not have an API counterpart. 18# It calls the scripts to stop services.19# it's done in python so it can handle --dumpdoc for the 20# automatic documentation generator.21import os22import glob23import sys24if __name__ == '__main__':25 if len(sys.argv) > 1:26 # if there's an argument, simply switch on it. It'll only be asking for the api or dumpdoc.27 if sys.argv[1] == "--dumpdoc" or sys.argv[1] == "-H":28 print """Stop ALL Cloud Sidekick services. Accepts no arguments."""29 exit()30 else:31 exit()32 33 os.system("$CSK_HOME/velocity/services/stop_services.sh")34 os.system("$CSK_HOME/automate/services/stop_services.sh")35 if os.path.exists(os.path.join(os.environ["CSK_HOME"], "deploy")):36 os.system("$CSK_HOME/deploy/services/stop_services.sh")37 os.system("$CSK_HOME/canvas/services/stop_services.sh")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!