Best Python code snippet using pandera_python
AIOConnection.py
Source:AIOConnection.py
1"""2AIO Connection Object3"""4import trio5from uuid import UUID6from .util import ServiceMessageEvent7from google.protobuf.message import Message8from CommonLib.proto.AIOMessage_pb2 import AIOMessage9class AIOConnection:10 def __init__(self, uuid: UUID, connection_stream: trio.SocketStream,11 server_event_handler: trio.abc.SendChannel.send):12 """13 :param uuid:14 :param connection_stream:15 :param server_event_handler:16 """17 # General information18 self._is_alive: bool = True19 self._uuid: UUID = uuid20 self._username: str = ''21 # Connection stream and server event handler22 self._connection_stream: trio.SocketStream = connection_stream23 self._server_event_handler: trio.abc.SendChannel.send = server_event_handler24 def __str__(self):25 return f'[AIOConnection:{self._uuid}]'26 @property27 def is_alive(self):28 return self._is_alive29 async def receiver(self):30 """31 AIOConnection _receiver32 - Should only receive (potentially encrypted) AIOMessages33 """34 try:35 async for serialized_aio_message in self._connection_stream:36 aio_message = AIOMessage()37 aio_message.ParseFromString(serialized_aio_message)38 print(f'AIOConnection:{self._uuid.hex[:8]} got data: {aio_message}')39 await self._server_event_handler(40 self._aio_msg_to_service_request(aio_message))41 except trio.BrokenResourceError as e:42 print(f'{e}')43 def _aio_msg_to_service_request(self, aio_message: AIOMessage) -> ServiceMessageEvent:44 """45 Convert a received and decrypted AIOMessage into a ServiceRequestEvent46 :param aio_message: Decrypted AIOMessage received from a client47 """48 return ServiceMessageEvent(49 requester_uuid=self._uuid.int,50 service_name=aio_message.message_name,51 message=aio_message.message,52 response_callback=self.send)53 async def send(self, message: Message):54 """ AIOConnection _sender """55 aio_wrapper = AIOMessage()56 aio_wrapper.message_name = message.DESCRIPTOR.name57 aio_wrapper.message = message.SerializeToString()58 print(f'{self} sending message: {aio_wrapper}')59 await self._connection_stream.send_all(...
s3.py
Source:s3.py
...5import botocore6# Define async behaviour.7executor = concurrent.futures.ThreadPoolExecutor()8def aio(func):9 async def aio_wrapper(**kwargs):10 f_bound = functools.partial(func, **kwargs)11 loop = asyncio.get_running_loop()12 return await loop.run_in_executor(executor, f_bound)13 return aio_wrapper14class S3Client:15 def __init__(self, client, bucket: str, url_path: str) -> None:16 self.client = client17 self.bucket = bucket18 self.url_path = url_path19 # You can comment this once bucket is created.20 self._check_if_bucket_exists()21 # Create async methods.22 self._get_object = aio(self.client.get_object)23 self._put_object = aio(self.client.put_object)...
plug.py
Source:plug.py
...4import logging5from functools import wraps6from modules.settings import load_settings7settings = load_settings()8def aio_wrapper(f):9 """wraps the async functions"""10 @wraps(f)11 def decor(*args,**kwargs):12 return asyncio.run(f(*args,**kwargs))13 return decor14@aio_wrapper15async def turn_on_plug():16 """turn on plug"""17 try:18 p = SmartPlug(settings["smart_plug_ip"])19 await p.update()20 logging.debug('Plug Turned On')21 await p.turn_on()22 except:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!