Best Python code snippet using playwright-python
netman.py
Source:netman.py
...133 (new_device_path, new_prop_iface) = nettools.get_obj_path(self.bus, interface)134 ntmtools.dbg_msg("NetMan.setInterface - device_path:{0}".format(new_device_path, new_prop_iface))135 if (self.mode == 0):136 if (new_device_path == ""):137 self.set_offline()138 else:139 self.bus.remove_signal_receiver(self.nm_h_state_changed, dbus_interface="org.freedesktop.NetworkManager.Device", signal_name="StateChanged", path = self.device_path)140 self.device_path = new_device_path141 try:142 self.bus.add_signal_receiver(self.nm_h_state_changed, dbus_interface="org.freedesktop.NetworkManager.Device", signal_name="StateChanged", path = self.device_path)143 modems = self.get_modems()144 self.device_info = Device(self.bus, self.device_path, modems)145 except:146 ntmtools.dbg_msg("Unexpected error: " + str(sys.exc_info()))147 148 if (self.get_state()):149 self.set_online()150 else:151 self.set_offline()152 else: # ping153 None154 self.device_path = new_device_path155 self.interface = interface 156 ntmtools.dbg_msg("END - NetMan.setInterface")157 ## - ##158 ## + ##159 # return true if active (online)160 def get_state(self):161 ntmtools.dbg_msg("NetMan.get_state")162 retVal = None163 if self.mode == 0:164 if (self.device_path != ""):165 dev_proxy = self.bus.get_object("org.freedesktop.NetworkManager", self.device_path)166 prop_iface = dbus.Interface(dev_proxy, "org.freedesktop.DBus.Properties")167 state = prop_iface.Get("org.freedesktop.NetworkManager.Device", "State")168 retVal = (state in self.NTM_MAP_NM_DEVICE_STATE_ACTIVATED)169 else:170 retVal = False171 '''172 proxy = self.bus.get_object('org.freedesktop.NetworkManager', '/org/freedesktop/NetworkManager')173 iface = dbus.Interface(proxy, dbus_interface='org.freedesktop.DBus.Properties')174 active_connections = iface.Get('org.freedesktop.NetworkManager', 'ActiveConnections')175 try:176 state = self.prop_iface.Get("org.freedesktop.NetworkManager.Device", "State")177 return (state == 8)178 except:179 ntmtools.dbgMsg(_("Unexpected error: ") + str(sys.exc_info()))180 return False181 '''182 elif self.mode == 1:183 status = self.get_ping_test()184 retVal = status185 else:186 ntmtools.dbg_msg("Wrong value for online check mode.\n")187 retVal = False188 ntmtools.dbg_msg("END - NetMan.get_state -> {0}".format(retVal))189 return retVal190 ## - ##191 ## + ##192 # handler()193 def add_online_handler(self, handler):194 self.online_event += handler195 ## - ##196 ## + ##197 # handler()198 def add_offline_handler(self, handler):199 self.offline_event += handler200 ## - ##201 ## + ##202 # handler(object_path)203 def add_device_added_handler(self, handler):204 self.deviceAdded_event += handler205 ## - ##206 ## + ##207 # handler(object_path)208 def add_device_removed_handler(self, handler):209 self.deviceRemoved_event += handler210 ## - ##211 ## + ##212 def set_online(self):213 ntmtools.dbg_msg("NetMan.set_online")214 if not self.online:215 self.online = True216 self.online_event()217 ntmtools.dbg_msg("END - NetMan.set_online")218 ## - ##219 ## + ##220 def set_offline(self):221 ntmtools.dbg_msg("NetMan.set_offline")222 if self.online:223 self.online = False224 self.offline_event()225 ntmtools.dbg_msg("END - NetMan.set_offline")226 ## - ##227 ## + ##228 def nm_h_state_changed(self, new_state, old_state, reason):229 ntmtools.dbg_msg("NetMan.nm_h_state_changed")230 if self.mode == 0: 231 if new_state in self.NTM_MAP_NM_DEVICE_STATE_ACTIVATED:232 self.set_online()233 else:234 self.set_offline()235 ntmtools.dbg_msg("END - NetMan.nm_h_state_changed")236 ## - ##237 ## + ##238 # for ping mode239 def update_timer_handler(self):240 ntmtools.dbg_msg("NetMan.update_timer_handler")241 if (self.mode == 1):242 if (self.get_ping_test()):243 self.set_online()244 else:245 self.set_offline()246 gobject.timeout_add(self.update_timer_interval * 1000, self.update_timer_handler)247 ntmtools.dbg_msg("END - NetMan.update_timer_handler")248 ## - ##249 ## + ##250 def get_ping_test(self):251 ntmtools.dbg_msg("NetMan.get_ping_test")252 ret = subprocess.call("ping -q -w2 -c1 " + self.ping_test_url,253 shell=True,254 stdout=open('/dev/null', 'w'),255 stderr=subprocess.STDOUT)256 retVal = None257 258 if ret == 0:259 retVal = True # Online260 else:261 retVal = False # Offline262 ntmtools.dbg_msg("END - NetMan.get_ping_test -> {0}".format(retVal))263 return retVal264 ## - ##265 ## + ##266 def ping_test(self):267 if self.mode != 1: return268 status = self.get_ping_test() 269 if status:270 self.set_online()271 else: 272 self.set_offline()273 274 gobject.timeout_add(2000, self.ping_test)275 ## - ##276 ## + ##277 def nm_h_device_added(self, device_path):278 ntmtools.dbg_msg("NetMan.nm_h_device_added : {0}".format(device_path))279 if (self.state == -1):280 dev_proxy = self.bus.get_object("org.freedesktop.NetworkManager", device_path)281 prop_iface = dbus.Interface(dev_proxy, "org.freedesktop.DBus.Properties")282 interface = prop_iface.Get("org.freedesktop.NetworkManager.Device", "Interface")283 if (interface == self.interface):284 self.device_path = device_path285 self.state = prop_iface.Get("org.freedesktop.NetworkManager.Device", "State")286 self.bus.add_signal_receiver(self.nm_h_state_changed, dbus_interface="org.freedesktop.NetworkManager.Device", signal_name="StateChanged", path = self.device_path)287 self.online = (self.state in self.NTM_MAP_NM_DEVICE_STATE_ACTIVATED)288 if self.online:289 self.set_online()290 ntmtools.dbg_msg("END - NetMan.nm_h_device_added")291 ## - ##292 ## + ##293 def nm_h_device_removed(self, object_path):294 ntmtools.dbg_msg("NetMan.nm_h_device_removed : {0}".format(object_path))295 if (self.state != -1):296 if (self.device_path == object_path):297 self.state = -1298 self.bus.remove_signal_receiver(self.nm_h_state_changed, dbus_interface="org.freedesktop.NetworkManager.Device", signal_name="StateChanged", path = self.device_path)299 self.set_offline()300 ntmtools.dbg_msg("END - NetMan.nm_h_device_removed")301 ## - ##302 ## + disconnect the device ##303 def disconnect_device(self):304 ntmtools.dbg_msg("NetMan.disconnect_device")305 if ( (self.mode == 0) and (self.device_path != "") ):306 try:307 proxy = self.bus.get_object('org.freedesktop.NetworkManager', self.device_path)308 ifaceNMDev = dbus.Interface(proxy, dbus_interface='org.freedesktop.NetworkManager.Device')309 ifaceNMDev.Disconnect()310 except:311 ntmtools.dbg_msg("NetMan.disconnect_device : Error")312 ntmtools.dbg_msg("END - NetMan.disconnect_device")313 ## - ##...
ADSBSensor.py
Source:ADSBSensor.py
...29 Methods30 -------31 set_healthy(health)32 Sets sensor health status to true/false33 set_offline(sys_state)34 Sets sensor offline status to true/false35 set_flux_cap(flux_cap_value)36 Sets the sensor's flux capacitor value37 set_random_error()38 Assign random error to a sensor's outage category/message39 set_latency(new_latency)40 Set the amount of latency that will be used before responding41 to any call to get_status()42 set_status(err_str)43 Set the status message to be returned. This is the 'log' output44 that will be generated when calling get_status()45 get_status()46 Return an NDJSON line representing the sensor's full status47 """48 from outages import outages49 offline = False50 healthy = True51 flux_capacitor = 1.2152 status = 'Operating nominally'53 latency = 054 outage_msg = ''55 outage_cat = ''56 def __init__(self, sid, lat, lon, city=None, state=None, tier=None, off=False, health=True, latency=0, flux_cap=1.21):57 """58 Parameters59 ----------60 sid : str61 The ADS-B sensor ID62 lat : str63 The sensor's latitude64 lon : str65 The sensor's longitude66 city : str, optional67 The city where the sensor is located68 state : str, optional69 The state where the sensor is located70 tier : str, optional71 Honestly have no idea what this is72 off : bool, optional73 Whether or not the sensor is offline (default false)74 health : bool, optional75 Whether or not the sensor is healthy (default true)76 latency : int, optional77 Latency to add to get_status() calls (default 0)78 flux_cap : float, optional79 The initial flux capacitor value (default 1.21 IYKYK)80 """81 self.id = sid82 self.city = city83 self.state = state84 self.latitude = lat85 self.longitude = lon86 self.tier = tier87 self.set_offline(off)88 self.set_healthy(health)89 self.set_latency(latency)90 self.set_flux_cap(flux_cap)91 def get_status(self):92 """93 Returns the full status message of the sensor's state as an94 NDJSON string95 """96 # Artificially add latency97 sleep(self.latency)98 utc_time = time.time()99 timestamp = datetime.datetime.utcfromtimestamp(utc_time).strftime("%Y-%m-%dT%H:%M:%SZ")100 msg = '{{"@timestamp": "{time}", "id": "{id}", "city": "{city}", "state": "{state}", "location": {{"lat": "{lat}", "lon": "{lon}"}}, "tier": "{tier}", "healthy": "{health}", "offline": "{offline}", "flux_capacitor": "{flux}", "status": "{status}", "outage.category": "{err_cat}", "outage.message": "{err_msg}"}}'.format(time=timestamp, \101 id=self.id, city=self.city, state=self.state, lat=self.latitude, \102 lon=self.longitude, tier=self.tier, health=str(self.healthy).lower(), offline=str(self.offline).lower(), \103 flux=self.flux_capacitor, status=self.status, err_cat=self.outage_cat, err_msg=self.outage_msg)104 return msg105 def set_flux_cap(self, flux_cap_value):106 """107 Sets the value of the flux capacitor. If the value is above108 1.46, set the sensor to 'unhealthy' and 'offline'. If the value109 is below 1.17, set the sensor to 'healthy' and 'online'.110 Parameters111 ----------112 flux_cap_value : float113 The value of the flux capacitor, expressed in Gigawatts114 """115 if flux_cap_value != '':116 self.flux_capacitor = float(flux_cap_value)117 if self.flux_capacitor > 1.46:118 self.set_healthy(False)119 self.set_offline(True)120 # Error state encountered, so set outage category and message121 self.outage_cat = "Power"122 self.outage_msg = "Great Scott! Flux capacitor surge detected (" + str(flux_cap_value) + " GigaWatt). Shutting down to avoid going back to year 2051."123 self.set_status("{} Error".format(self.outage_cat).title())124 # This just sets an explicit status message, since we're not125 # in a normal state but also not in an error state. (In case126 # you haven't guessed by now this is all completely made up!)127 elif self.flux_capacitor < 1.17:128 self.set_healthy(True)129 self.set_offline(False)130 self.set_status("Flux Capacitor Underpowered")131 def set_healthy(self, health):132 """133 Sets the sensor state to either 'online' or 'offline'.134 Parameters135 ----------136 health : bool137 Whether or not the sensor is healthy (True/False)138 """139 self.healthy = health140 if self.healthy:141 self.status = 'Operating nominally'142 def set_latency(self, new_latency):143 """144 Sets the amount of latency to wait before responding to calls145 to get_status()146 Parameters147 ----------148 new_latency : float149 The amount of time to wait before returning a status150 """151 self.latency = new_latency152 def set_offline(self, sys_state):153 """154 Sets the state of the sensor as either 'online' or 'offline'.155 Parameters156 ----------157 sys_state : bool158 Offline or online159 """160 self.offline = sys_state161 def set_random_error(self):162 """163 Grab a random error message from the 'outages' dict and 164 apply it to this object's outage/message 165 """166 err_obj = random.choice(self.outages)167 self.outage_cat = err_obj['outage.category']168 self.outage_msg = err_obj['outage.message']169 # Sensor is unhealthy, so also set health and offline 170 # appropriately, as well as the status message171 self.set_healthy(False)172 self.set_offline(True)173 self.set_status("{} Error".format(self.outage_cat).title())174 175 def set_status(self, err_str):176 """177 Sets the status of the sensor based on the string based in178 Parameters179 ----------180 err_str : str181 The status message that gets returned/written as the 182 sensor's ultimate state183 """184 self.status = err_str 185 186 def __str__(self):...
core.py
Source:core.py
1import asyncio2import json3import time4import sys5import ast6import io7from typing import Optional, Any8import requests9from pyrogram import filters10from util.permission import is_superuser11from util.command import filterCommand12from util.message import ProgressChatAction, edit_or_reply13from util.decorators import report_error, set_offline, cancel_chat_action14from bot import alemiBot15import logging16logger = logging.getLogger(__name__)17from plugins.lootbot.common import CONFIG18from plugins.lootbot.loop import LOOP, StateDict19TASK_INTERRUPT = False20# Macro for night mode21@alemiBot.on_message(is_superuser & filterCommand(["lnight", "lnt"], list(alemiBot.prefixes)))22@report_error(logger)23@set_offline24async def toggle_night(client, message):25 if type(CONFIG()["night"]) is not bool:26 CONFIG()["night"] = False27 CONFIG()["night"] = not CONFIG()["night"]28 await CONFIG.serialize()29 await edit_or_reply(message, f"` â ` Night mode [`{'ON' if CONFIG()['night'] else 'OFF'}`]")30@alemiBot.on_message(is_superuser & filterCommand(["lfriend", "lfriends"], list(alemiBot.prefixes)))31@report_error(logger)32@set_offline33async def sync_friends_command(client, message):34 out = ""35 if len(message.command.arg) > 0:36 CONFIG()["sync"]["friends"]["url"] = message.command.arg[0]37 await CONFIG.serialize()38 out += f"` â lb.sync.friends.url` : --{message.command.arg[0]}--\n" 39 data = requests.get(CONFIG()["sync"]["friends"]["url"]).json()40 with open("plugins/lootbot/data/friends.json", "w") as f:41 json.dump(data, f)42 LOOP.state["friends"] = data43 out += f"` â ` Synched {len(data)} friends\n"44 await edit_or_reply(message, out)45def format_recursive(layer, level, base=""):46 if type(layer) is not dict and type(layer) is not StateDict:47 return str(layer) + "\n"48 out = "\n"49 view = list(layer.keys())50 last = False51 for key in view:52 last = key == view[-1]53 out += base + ('â' if not last else 'â') + f" `{key}` : " + \54 format_recursive(layer[key], level+1, base+("ã\t" if last else "â\t"))55 return out56def _extract(data:dict, query:str) -> Optional[Any]:57 keys = list(query.split("."))58 for k in keys:59 if k in data:60 data = data[k]61 else:62 return None63 return data64@alemiBot.on_message(is_superuser & filterCommand(["lvar", "lvars"], list(alemiBot.prefixes)))65@report_error(logger)66@set_offline67@cancel_chat_action68async def get_loopstate(client, message):69 if len(message.command) > 0:70 state = _extract(LOOP.state, message.command[0])71 text = f"lstate.{message.command[0]}"72 out = f"`{text} â `" + format_recursive(state, 0)73 await edit_or_reply(message, out)74 else:75 await edit_or_reply(message, "` â ` Sending loop state")76 prog = ProgressChatAction(client, message.chat.id)77 out = io.BytesIO((str(LOOP.state)).encode('utf-8'))78 out.name = f"loop-state.json"79 await client.send_document(message.chat.id, out, progress=prog.tick)80def _parse_val(val:str) -> Any:81 if val.lower() in ["true", "t"]:82 return True83 if val.lower() in ["false", "f"]:84 return False85 try:86 return ast.literal_eval(val)87 except Exception:88 return val89@alemiBot.on_message(is_superuser & filterCommand(["lcfg", "lconfig", "lset"], list(alemiBot.prefixes), flags=["-f", "-state"]))90@report_error(logger)91@set_offline92async def get_config(client, message):93 edit_state = bool(message.command["-state"])94 data = LOOP.state if edit_state else CONFIG()95 if len(message.command) > 1:96 force = bool(message.command["-f"])97 keys = list(message.command[0].split("."))98 value = _parse_val(message.command[1])99 last = keys.pop()100 if len(keys) > 0:101 data = _extract(data, '.'.join(keys))102 # Some lame safety checks for users103 if not data:104 raise KeyError(f"No setting matching '{message.command[0]}'")105 if not force and type(data[last]) is dict:106 raise KeyError(f"Trying to replace category '{last}'")107 if not force and type(data[last]) is not type(value):108 raise ValueError(f"Wrong type: expected {type(data[last]).__name__} but got {type(value).__name__}")109 if not force and type(data[last]) is list and len(data[last]) != len(value):110 raise ValueError(f"Wrong length: expected {len(data[last])} values, got {len(value)}")111 data[last] = value112 if not edit_state:113 await CONFIG.serialize()114 await edit_or_reply(message, f"` â ` {'**[F]**' if force else ''} `lcfg.{message.command[0]}` : --{value}--")115 else:116 text = "lcfg"117 if len(message.command) > 0:118 data = _extract(data, message.command[0])119 text = f"lcfg.{message.command[0]}"120 out = f"`{text} â `" + format_recursive(data, 0)121 await edit_or_reply(message, out)122@alemiBot.on_message(is_superuser & filterCommand(["ltask", "task", "tasks"], list(alemiBot.prefixes), options={123 "updates" : ["-u", "-upd"],124 "interval" : ["-i", "-int"]125}, flags=["-stop"]))126@report_error(logger)127@set_offline128async def get_tasks(client, message):129 global TASK_INTERRUPT130 if message.command["-stop"]:131 TASK_INTERRUPT = True132 return133 times = int(message.command["updates"] or 3)134 interval = float(message.command["interval"] or 5)135 last = ""136 for _ in range(times):137 out = f"`â ` --{LOOP.current.ctx.name if LOOP.current is not None else 'N/A'}--\n"138 for task in LOOP.tasks:139 out += f"` â ` {task.ctx.name}\n"140 if out != last:141 await edit_or_reply(message, out)142 last = out143 await asyncio.sleep(interval)144 if TASK_INTERRUPT:145 break146 await edit_or_reply(message, f"`â ` {'Stopped' if TASK_INTERRUPT else 'Done'}")...
main.py
Source:main.py
...18 return19 self.status = True20 await self.dispatch('node_online', self.url)21 22 async def set_offline(self):23 if not self.status:24 return25 self.status = False26 await self.dispatch('node_offline', self.url)27 async def check_alive(self) -> bool:28 try:29 async with self.session.get(f'{self.url}/eth/v1/node/health') as resp:30 if resp.status == 200:31 await self.set_online()32 return True33 except:34 await self.set_offline()35 return False36 37 async def do_request(self, method: str, path: str, data: Dict[str, Any]=None) -> Tuple[Optional[Dict[str, Any]], int]:38 try:39 async with self.session.request(method, f'{self.url}{path}', json=data) as resp:40 try:41 return ((await resp.text()), resp.status)42 except:43 return (dumps({'error': 'Server returned unexpected value'}), 500)44 except (aiohttp.ServerTimeoutError, aiohttp.ServerConnectionError):45 await self.set_offline()46 return ServerOffline()47 48 async def do_stream_request(self, path: str, response: HTTPResponse, data: Dict[str, Any]=None):49 try:50 async with self.session.get(f'{self.url}{path}', headers={'accept': 'text/event-stream'}) as resp:51 async for data in resp.content:52 await response.send(data)53 except (aiohttp.ServerTimeoutError, aiohttp.ServerConnectionError):54 await self.set_offline()55 return ServerOffline()56 async def stop(self):57 await self.session.close()58class OutOfAliveNodes:59 pass60class NodeRouter:61 def __init__(self, urls: List[str]):62 if not urls:63 raise ValueError('No nodes provided')64 self.urls = urls65 self.dispatch = logger.dispatch66 self.listener = logger.listener67 68 async def recheck(self) -> None:...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!