Best Python code snippet using localstack_python
test_twitch_chatbot.py
Source:test_twitch_chatbot.py
1from threading import Thread2from time import sleep3from typing import Tuple, List4from dataclasses import dataclass, field5from teamfightchaticts.settings import TwitchSettings6from teamfightchaticts.tft_command import TFTCommand7from teamfightchaticts.twitch_connection import TwitchConnection, LineBuffer8@dataclass9class IrcSocketMock:10 text_to_return: str11 text_received: str=''12 closed: bool=True13 buffer_id: int=014 encoding: str='utf-8'15 def connect(self, _: Tuple[str, int]):16 self.closed = False17 def close(self):18 self.closed = True19 def recv(self, bufsize: int) -> bytes:20 text_enc = self.text_to_return.encode(self.encoding)21 start, end = self.buffer_id * bufsize, (self.buffer_id + 1) * bufsize22 self.buffer_id += 123 sleep(0.01)24 return text_enc[start:min(end, len(text_enc))] if start < len(text_enc) else bytes([])25 def send(self, data: bytes) -> int:26 self.text_received += data.decode(self.encoding)27 sleep(0.01)28@dataclass29class RemoteControlMock:30 received_commands: List[TFTCommand]=field(init=False, default_factory=list)31 def execute_cmd(self, tft_cmd: TFTCommand):32 self.received_commands.append(tft_cmd)33def msg_padding(sequence: str, repetitions: int, separator: str=''):34 return separator.join([sequence for _ in range(repetitions)])35def test_should_connect_to_chat():36 conn_settings = TwitchSettings('foobar.com', 6667, 'twitch_test', 'my_chatbot', 'somepwd')37 text_to_return = "End of /NAMES list\r\n"38 connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(text_to_return))39 connection.connect_to_server()40 exp_text_buffer = 'PASS somepwd\nNICK my_chatbot\nJOIN #twitch_test\n'41 socket: IrcSocketMock = connection.websocket42 assert not socket.closed and socket.text_received == exp_text_buffer43# def test_should_fail_to_connect_to_chat_after_timeout():44# # TODO: use asyncio for this feature. threads are very clunky to achieve this.45# conn_settings = TwitchSettings('twitch.tv', 6667, 'twitch_test', 'my_chatbot', 'somepwd')46# connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(''), timeout_seconds=0.5)47# connection.connect_to_server()48# sleep(0.6)49# print(connection)50# assert connection.irc is None51# # TODO: make this test work52def test_should_disconnect_from_chat_gracefully():53 conn_settings = TwitchSettings('twitch.tv', 6667, 'twitch_test', 'my_chatbot', 'somepwd')54 text_to_return = "End of /NAMES list\r\n" + msg_padding(' ', 1024) \55 + "\r\n::w3w4\r\n::lock\r\n::some text\r\n::lvl\r\n"56 connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(text_to_return))57 msgs_received: List[TFTCommand] = []58 shutdown_requested = False59 def observe_twitch_chat():60 connection.connect_to_server()61 connection.register_message_listener(msgs_received.append)62 connection.receive_messages_as_daemon(lambda: shutdown_requested)63 conn_thread = Thread(target=observe_twitch_chat)64 conn_thread.start()65 sleep(1)66 shutdown_requested = True67 conn_thread.join(timeout=0.1)68 assert set(['w3w4', 'lock', 'lvl']) <= set(msgs_received)69def test_line_buffer():70 def process_read_buffer(text_buffer, remainder):71 buf = LineBuffer(remainder)72 return buf.process(text_buffer), buf.remainder73 assert process_read_buffer('', '') == ([], '')74 assert process_read_buffer('', 'abc') == ([], 'abc')75 assert process_read_buffer('abc\r\n', '') == (['abc'], '')76 assert process_read_buffer('abc\r\ndef', '') == (['abc'], 'def')77 assert process_read_buffer('abc\r\ndef\r\nghi', '') == (['abc', 'def'], 'ghi')78 assert process_read_buffer('\ndef', 'abc\r') == (['abc'], 'def')79 assert process_read_buffer('\r\ndef', 'abc') == (['abc'], 'def')80 assert process_read_buffer('abc\r\ndef\r\n', '') == (['abc', 'def'], '')81def test_should_send_chat_pong():82 conn_settings = TwitchSettings('twitch.tv', 6667, 'twitch_test', 'my_chatbot', 'somepwd')83 text_to_return = "End of /NAMES list\r\n" + msg_padding(' ', 1024) \84 + "\r\n::w3w4\r\n::lock\r\n::some text\r\nPING :tmi.twitch.tv\r\n::lvl\r\n"85 connection = TwitchConnection(conn_settings, lambda: IrcSocketMock(text_to_return))86 shutdown_requested = False87 def observe_twitch_chat():88 connection.connect_to_server()89 connection.receive_messages_as_daemon(lambda: shutdown_requested)90 conn_thread = Thread(target=observe_twitch_chat)91 conn_thread.start()92 sleep(1)93 shutdown_requested = True94 conn_thread.join(timeout=0.1)95 socket: IrcSocketMock = connection.websocket96 assert 'PONG :tmi.twitch.tv' in socket.text_received...
subscription.py
Source:subscription.py
1#!/usr/bin/env python32from config import *3import multiprocessing, threading, logging, time, signal4from google.cloud import pubsub_v15shutdown_requested = False # Whether this program should shut down due to SIGINT/SIGTERM6def subscribe(subscription_name, worker, give_up=False):7 """Receives and spawns threads to handle jobs received in Pub/Sub"""8 global shutdown_requested9 message = None # The current active message10 lock = threading.Lock()11 client = pubsub_v1.SubscriberClient()12 subscription_path = client.subscription_path(GCLOUD_PROJECT_ID, subscription_name)13 def renew_deadline():14 """Repeatedly give the active message more time to be processed to prevent it being resent"""15 while not (message == None and shutdown_requested):16 if message != None:17 try:18 with lock:19 client.modify_ack_deadline(subscription=subscription_path,20 ack_ids=[message.ack_id],21 ack_deadline_seconds=SUB_ACK_DEADLINE)22 logging.debug('Reset ack deadline for {} for {}s'.format(message.message.data.decode(), SUB_ACK_DEADLINE))23 time.sleep(SUB_SLEEP_TIME)24 except Exception as e:25 logging.warning('Could not reset ack deadline', exc_info=e)26 watcher = threading.Thread(target=renew_deadline)27 watcher.start()28 # Repeatedly check for new jobs until SIGINT/SIGTERM received29 logging.info('Listening for jobs')30 try:31 while not shutdown_requested:32 response = client.pull(request= {'subscription': subscription_path, 'max_messages':1})33 if not response.received_messages:34 logging.info('Job queue is empty')35 time.sleep(SUB_SLEEP_TIME)36 continue37 if len(response.received_messages) > 1:38 logging.warning('Received more than one job when only one expected')39 with lock:40 message = response.received_messages[0]41 logging.info('Beginning: {}'.format(message.message.data.decode()))42 process = multiprocessing.Process(target=worker, args=(message.message.data.decode(),))43 process.start()44 process.join()45 if process.exitcode == 0:46 # Success; acknowledge and return47 try:48 client.acknowledge(subscription=subscription_path, ack_ids=[message.ack_id])49 logging.info('Ending and acknowledged: {}'.format(message.message.data.decode()))50 except Exception as e:51 logging.error('Could not end and acknowledge: {}'.format(message.message.data.decode()), exc_info=e)52 elif give_up and (time.time() - message.message.publish_time.timestamp()) >600:53 # Failure; give up and acknowledge54 try:55 client.acknowledge(subscription=subscription_path, ack_ids=[message.ack_id])56 logging.error('Failed but acknowledged: {}'.format(message.message.data.decode()))57 except Exception as e:58 logging.error('Failed but could not acknowledge: {}'.format(message.message.data.decode()), exc_info=e)59 else:60 # Failure; refuse to acknowledge61 logging.error('Failed, not acknowledged: {}'.format(message.message.data.decode()))62 # Stop extending this message's deadline in the "watcher" thread63 with lock:64 message = None65 except Exception as e:66 logging.critical('Exception encountered. ', exc_info=e)67 finally:68 # If there is an exception, make sure the "watcher" thread shuts down69 shutdown_requested = True70def graceful_exit(signal, frame):71 global shutdown_requested72 shutdown_requested = True73 logging.warning('Requesting shutdown due to signal {}'.format(signal))74signal.signal(signal.SIGINT, graceful_exit)...
twitch_chatbot.py
Source:twitch_chatbot.py
1from threading import Thread2from typing import Dict3from dataclasses import dataclass, field4from teamfightchaticts.tft_command import TFTCommand5from teamfightchaticts.twitch_connection import TwitchConnection6from teamfightchaticts.tft_remote_control import TFTRemoteControl7@dataclass8class TwitchTFTChatbotState:9 last_cmd: TFTCommand=TFTCommand('')10 cmd_counts: Dict[TFTCommand, int]=field(default_factory=lambda: {})11 pool: int=1012 def update_state(self, tft_cmd: TFTCommand):13 self.cmd_counts[tft_cmd] += 114 def reset_counts(self):15 self.last_cmd = self.cmd_to_execute16 self.cmd_counts = {}17 @property18 def cmd_to_execute(self) -> TFTCommand:19 return next(filter(lambda cmd: self.cmd_counts[cmd] >= self.pool, self.cmd_counts), None)20@dataclass21class TwitchTFTChatbot:22 connection: TwitchConnection23 tft_remote_control: TFTRemoteControl24 state: TwitchTFTChatbotState=TwitchTFTChatbotState()25 thread: Thread=field(init=False, default=None)26 shutdown_requested: bool=False27 def start_bot(self, pool_size: int):28 if self.shutdown_requested:29 return30 self.state.pool = pool_size31 self.shutdown_requested = False32 self.thread = Thread(target=self._receive_twitch_messages)33 self.thread.start()34 def stop_bot(self):35 self.shutdown_requested = True36 self.thread.join(timeout=0.1)37 self.shutdown_requested = False38 def _receive_twitch_messages(self):39 self.connection.connect_to_server()40 self.connection.register_message_listener(self._process_tft_cmd)41 self.connection.receive_messages_as_daemon(lambda: self.shutdown_requested)42 def _process_tft_cmd(self, msg: str):43 tft_cmd = TFTCommand(msg)44 self.state.update_state(tft_cmd)45 cmd_exec = self.state.cmd_to_execute46 # vote for next command not complete yet47 if not cmd_exec:48 return49 # same command twice50 if cmd_exec == self.state.last_cmd:51 self.state.last_cmd = TFTCommand('')52 return53 # command is ok, go execute it54 self.tft_remote_control.execute_cmd(cmd_exec)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!