Best Python code snippet using lettuce_webdriver_python
watchdog.py
Source:watchdog.py
1import multiprocessing2import signal3import os4import sys5import select6import logging7import time8from importlib import reload9watchdog_logger = logging.getLogger('watchdog')10def run_monitored(f, heartbeat_timeout=5*60, kill_signal=signal.SIGTERM, select_timeout=15, always_restart=True):11 def monitored(*args, **kwargs):12 read_fd, write_fd = os.pipe()13 stop_now = False14 exit_code = 015 def heartbeat_func():16 os.write(write_fd, b'\x00')17 def worker_target():18 # Massive booger to blow out all existing logging handlers before forking.19 logging.shutdown()20 reload(logging)21 return f(*args, **kwargs)22 kwargs.update(heartbeat_func=heartbeat_func)23 watchdog_logger.info("Starting watched process...")24 worker_process = multiprocessing.Process(target=worker_target)25 worker_process.start()26 watchdog_logger.info("Watched process started, PID: {0}".format(worker_process.pid))27 last_seen_time = time.time()28 try:29 while not stop_now:30 current_time = time.time()31 read_fds, _, _ = select.select([read_fd], [], [], select_timeout)32 if read_fd in read_fds:33 watchdog_logger.info("Heartbeat successfully read back from worker.")34 os.read(read_fd, 1)35 last_seen_time = time.time()36 if not worker_process.is_alive() and always_restart:37 watchdog_logger.warning("Working process needs to be restarted, restarting...")38 worker_process = multiprocessing.Process(target=worker_target)39 worker_process.start()40 watchdog_logger.info("New worker process started: PID {0}".format(worker_process.pid))41 last_seen_time = time.time()42 elif not worker_process.is_alive():43 stop_now = True44 exit_code = worker_process.exitcode45 elif current_time - last_seen_time > heartbeat_timeout:46 watchdog_logger.warning(47 "No worker process heartbeat seen in {0} seconds.".format(heartbeat_timeout)48 )49 watchdog_logger.warning("Killing worker process and restarting...")50 attempts_left = 1051 while attempts_left > 0 and worker_process.is_alive():52 os.kill(worker_process.pid, kill_signal)53 time.sleep(1)54 if worker_process.is_alive():55 watchdog_logger.error(56 "Unable to kill worker with signal {0}, attempting SIGKILL".format(kill_signal)57 )58 os.kill(worker_process.pid, signal.SIGKILL)59 time.sleep(1)60 worker_process = multiprocessing.Process(target=worker_target)61 worker_process.start()62 watchdog_logger.info("New worker process started: PID {0}".format(worker_process.pid))63 last_seen_time = time.time()64 except KeyboardInterrupt:65 watchdog_logger.info("SIGINT seen, shutting down...")66 except BaseException as e:67 watchdog_logger.exception("Unhandled exception in watchdog process!")68 raise e69 else:70 sys.exit(exit_code)71 finally:72 if worker_process.is_alive():73 os.kill(worker_process.pid, kill_signal)74 return monitored75if __name__ == "__main__":76 root_logger = logging.getLogger('')77 root_logger.addHandler(logging.StreamHandler())78 root_logger.setLevel(logging.DEBUG)79 def target(heartbeat_func=lambda: None):80 for _ in range(5):81 time.sleep(3)82 heartbeat_func()...
util.py
Source:util.py
1from base64 import b64decode, b64encode2from typing import Any, Dict, List, Tuple3def encode_base64(string: str, encoding: str = "utf-8") -> str:4 return b64encode(string.encode(encoding)).decode(encoding)5def decode_base64(string: str, encoding: str = "utf-8") -> str:6 return b64decode(string.encode(encoding)).decode(encoding)7# TODO: complete the example8def flatten_dict(dictionary: Dict[str, Any], prefix: str = ""):9 """Transforms a given dictionary into a flattened key value dictionary.10 Example:11 >>> from pprint import pprint12 >>> options = {13 "debug": True,14 "user": {15 "first_name": "Jon",16 "last_name": "Doe"17 },18 "offset": 719 }20 >>> pprint(explode_dict(options))21 """22 exploded_options = {}23 stack: List[Tuple[str, List[Any]]] = [(prefix, list(dictionary.items()))]24 while stack[0][1] != []:25 prefix, value = stack[0][1].pop(0)26 if isinstance(value, dict):27 stack.insert(0, (prefix, list(value.items())))28 else:29 prefixes = [elem[0] for elem in reversed(stack) if elem[0]]30 prefixes.append(prefix)31 prefix_str = ".".join(prefixes)32 exploded_options[prefix_str] = value33 return exploded_options34# TODO: remove any its everything except another dict35def unflatten_dict(dictionary: Dict[str, Any]):36 keys = list(dictionary.keys())37 # get prefix38 prefix = None39 index = 040 for char in zip(*keys):41 if char[0] * len(char) == "".join(char):42 index += 143 if index != 0:44 prefix = keys[0][: index - 1]45 # get dict46 unflatten_dict = {}47 for key in keys:48 # TODO: optimize this49 # remove prefix from key50 value = dictionary[key]51 if prefix:52 key = key.split(prefix)[1][1:]53 prefixes = key.split(".")54 cur_dict = unflatten_dict55 cur_prefix = prefixes.pop()56 # proceed to the nested dictionary57 for pkey in prefixes:58 if cur_dict.get(pkey) is None:59 cur_dict[pkey] = {}60 cur_dict = cur_dict[pkey]61 cur_dict[cur_prefix] = value62 return prefix, unflatten_dict63# from threading import Thread64# import asyncio65# def run_until_timeout(func, timeout):66# result_queue = Queue()67# worker_process = Process(target=worker, args=(func, result_queue), daemon=True)68# worker_process.start()69# worker_process.join(timeout)70# if worker_process.is_alive():71# worker_process.terminate()72# worker_process.join()73# # if worker_process.is_alive():74# # raise Exception("worker is still alive")75# raise ServerTimeout()76# if not result_queue.empty():77# print(result_queue.get())78# def test():79# for i in range(10):80# print("sleeping")81# time.sleep(1)82# return "result"83# def run_func(func):84# with ThreadPoolExecutor(max_workers=2) as pool:85# result = pool.submit(func)86# result.result(5)87# pool.shutdown(False)88# pool.shutdown()89# print("shuedown")90# # print(result.cancel())91# # print(result.set_exception(Exception("Timeout")))92# def run_until_timeout(func, timeout):93# run_func(func)94# print("DONE")95# def worker(func, queue):96# result = func()97# queue.put(result)98# if __name__ == "__main__":99# run_until_timeout(test, 10)100# worker_process = Process(target=test, daemon=True)101# worker_process.start()102# worker_process.join(5)103# if worker_process.is_alive():104# worker_process.terminate()105# if worker_process.is_alive():106# raise Exception("worker is still alive")...
test_integration_worker_process.py
Source:test_integration_worker_process.py
1import multiprocessing2import time3from unittest import mock, TestCase4from worker_process import WorkerProcess5class WorkerProcessIntegrationTestCase(TestCase):6 def test_succesful_initialization(self):7 try:8 with mock.patch('config.MACHINE') as MockMachine:9 is_initialized = multiprocessing.Value('b', False)10 def mock_initialize_method():11 is_initialized.value = True12 MockMachine.initialize.side_effect = mock_initialize_method13 worker_process = WorkerProcess.create_and_start()14 worker_process.send_message_initialize()15 time.sleep(0.1)16 self.assertTrue(is_initialized.value)17 self.assertEqual(18 worker_process.get_logs(),19 [{'level': 'INFO', 'message': 'Machine initialized successfully'}],20 )21 finally:22 worker_process.kill()23 def test_failed_initialization(self):24 try:25 with mock.patch('config.MACHINE') as MockMachine:26 MockMachine.initialize.side_effect = Exception("Failed to initialize machine")27 worker_process = WorkerProcess.create_and_start()28 worker_process.send_message_initialize()29 time.sleep(0.1)30 self.assertEqual(31 worker_process.get_logs(),32 [{'level': 'ERROR', 'message': 'Failed to initialize machine'}],33 )34 finally:35 worker_process.kill()36 def test_succesful_gcode_sending(self):37 try:38 with mock.patch('config.MACHINE') as MockMachine:39 received_move_coordinates = multiprocessing.Array('c', range(100))40 def mock_move_by_method(x, y, z, feed_rate):41 received_move_coordinates.value = b'x=%f y=%f z=%f feed_rate=%f' % (42 x,43 y,44 z,45 feed_rate,46 )47 type(MockMachine).rapid_move_feed_rate = mock.PropertyMock(return_value=1000)48 MockMachine.move_by.side_effect = mock_move_by_method49 worker_process = WorkerProcess.create_and_start()50 worker_process.send_message_initialize()51 worker_process.send_message_gcode("G0 X3 Y2 Z1")52 time.sleep(0.1)53 self.assertEqual(54 received_move_coordinates.value,55 b'x=3.000000 y=2.000000 z=1.000000 feed_rate=1000.000000')56 logs = worker_process.get_logs()57 self.assertEqual(len(logs), 2)58 self.assertEqual(logs[0]['level'], 'INFO')59 self.assertEqual(logs[0]['message'], 'Machine initialized successfully')60 self.assertEqual(logs[1]['level'], 'INFO')61 self.assertTrue(logs[1]['message'].startswith("gcode interpreted successfully, took"))62 finally:63 worker_process.kill()64 def test_failed_gcode_sending(self):65 try:66 with mock.patch('config.MACHINE'):67 worker_process = WorkerProcess.create_and_start()68 worker_process.send_message_initialize()69 worker_process.send_message_gcode("Invalid")70 time.sleep(0.1)71 self.assertEqual(72 worker_process.get_logs(),73 [74 {'level': 'INFO', 'message': 'Machine initialized successfully'},75 {'level': 'ERROR', 'message': "word 'I' value invalid"},76 ]77 )78 finally:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!