Best Python code snippet using lettuce_webdriver_python
watchdog.py
Source: watchdog.py
1import multiprocessing2import signal3import os4import sys5import select6import logging7import time8from importlib import reload9watchdog_logger = logging.getLogger('watchdog')10def run_monitored(f, heartbeat_timeout=5*60, kill_signal=signal.SIGTERM, select_timeout=15, always_restart=True):11 def monitored(*args, **kwargs):12 read_fd, write_fd = os.pipe()13 stop_now = False14 exit_code = 015 def heartbeat_func():16 os.write(write_fd, b'\x00')17 def worker_target():18 # Massive booger to blow out all existing logging handlers before forking.19 logging.shutdown()20 reload(logging)21 return f(*args, **kwargs)22 kwargs.update(heartbeat_func=heartbeat_func)23 watchdog_logger.info("Starting watched process...")24 worker_process = multiprocessing.Process(target=worker_target)25 worker_process.start()26 watchdog_logger.info("Watched process started, PID: {0}".format(worker_process.pid))27 last_seen_time = time.time()28 try:29 while not stop_now:30 current_time = time.time()31 read_fds, _, _ = select.select([read_fd], [], [], select_timeout)32 if read_fd in read_fds:33 watchdog_logger.info("Heartbeat successfully read back from worker.")34 os.read(read_fd, 1)35 last_seen_time = time.time()36 if not worker_process.is_alive() and always_restart:37 watchdog_logger.warning("Working process needs to be restarted, restarting...")38 worker_process = multiprocessing.Process(target=worker_target)39 worker_process.start()40 watchdog_logger.info("New worker process started: PID {0}".format(worker_process.pid))41 last_seen_time = time.time()42 elif not worker_process.is_alive():43 stop_now = True44 exit_code = worker_process.exitcode45 elif current_time - last_seen_time > heartbeat_timeout:46 watchdog_logger.warning(47 "No worker process heartbeat seen in {0} seconds.".format(heartbeat_timeout)48 )49 watchdog_logger.warning("Killing worker process and restarting...")50 attempts_left = 1051 while attempts_left > 0 and worker_process.is_alive():52 os.kill(worker_process.pid, kill_signal)53 time.sleep(1)54 if worker_process.is_alive():55 watchdog_logger.error(56 "Unable to kill worker with signal {0}, attempting SIGKILL".format(kill_signal)57 )58 os.kill(worker_process.pid, signal.SIGKILL)59 time.sleep(1)60 worker_process = multiprocessing.Process(target=worker_target)61 worker_process.start()62 watchdog_logger.info("New worker process started: PID {0}".format(worker_process.pid))63 last_seen_time = time.time()64 except KeyboardInterrupt:65 watchdog_logger.info("SIGINT seen, shutting down...")66 except BaseException as e:67 watchdog_logger.exception("Unhandled exception in watchdog process!")68 raise e69 else:70 sys.exit(exit_code)71 finally:72 if worker_process.is_alive():73 os.kill(worker_process.pid, kill_signal)74 return monitored75if __name__ == "__main__":76 root_logger = logging.getLogger('')77 root_logger.addHandler(logging.StreamHandler())78 root_logger.setLevel(logging.DEBUG)79 def target(heartbeat_func=lambda: None):80 for _ in range(5):81 time.sleep(3)82 heartbeat_func()...
util.py
Source: util.py
1from base64 import b64decode, b64encode2from typing import Any, Dict, List, Tuple3def encode_base64(string: str, encoding: str = "utf-8") -> str:4 return b64encode(string.encode(encoding)).decode(encoding)5def decode_base64(string: str, encoding: str = "utf-8") -> str:6 return b64decode(string.encode(encoding)).decode(encoding)7# TODO: complete the example8def flatten_dict(dictionary: Dict[str, Any], prefix: str = ""):9 """Transforms a given dictionary into a flattened key value dictionary.10 Example:11 >>> from pprint import pprint12 >>> options = {13 "debug": True,14 "user": {15 "first_name": "Jon",16 "last_name": "Doe"17 },18 "offset": 719 }20 >>> pprint(explode_dict(options))21 """22 exploded_options = {}23 stack: List[Tuple[str, List[Any]]] = [(prefix, list(dictionary.items()))]24 while stack[0][1] != []:25 prefix, value = stack[0][1].pop(0)26 if isinstance(value, dict):27 stack.insert(0, (prefix, list(value.items())))28 else:29 prefixes = [elem[0] for elem in reversed(stack) if elem[0]]30 prefixes.append(prefix)31 prefix_str = ".".join(prefixes)32 exploded_options[prefix_str] = value33 return exploded_options34# TODO: remove any its everything except another dict35def unflatten_dict(dictionary: Dict[str, Any]):36 keys = list(dictionary.keys())37 # get prefix38 prefix = None39 index = 040 for char in zip(*keys):41 if char[0] * len(char) == "".join(char):42 index += 143 if index != 0:44 prefix = keys[0][: index - 1]45 # get dict46 unflatten_dict = {}47 for key in keys:48 # TODO: optimize this49 # remove prefix from key50 value = dictionary[key]51 if prefix:52 key = key.split(prefix)[1][1:]53 prefixes = key.split(".")54 cur_dict = unflatten_dict55 cur_prefix = prefixes.pop()56 # proceed to the nested dictionary57 for pkey in prefixes:58 if cur_dict.get(pkey) is None:59 cur_dict[pkey] = {}60 cur_dict = cur_dict[pkey]61 cur_dict[cur_prefix] = value62 return prefix, unflatten_dict63# from threading import Thread64# import asyncio65# def run_until_timeout(func, timeout):66# result_queue = Queue()67# worker_process = Process(target=worker, args=(func, result_queue), daemon=True)68# worker_process.start()69# worker_process.join(timeout)70# if worker_process.is_alive():71# worker_process.terminate()72# worker_process.join()73# # if worker_process.is_alive():74# # raise Exception("worker is still alive")75# raise ServerTimeout()76# if not result_queue.empty():77# print(result_queue.get())78# def test():79# for i in range(10):80# print("sleeping")81# time.sleep(1)82# return "result"83# def run_func(func):84# with ThreadPoolExecutor(max_workers=2) as pool:85# result = pool.submit(func)86# result.result(5)87# pool.shutdown(False)88# pool.shutdown()89# print("shuedown")90# # print(result.cancel())91# # print(result.set_exception(Exception("Timeout")))92# def run_until_timeout(func, timeout):93# run_func(func)94# print("DONE")95# def worker(func, queue):96# result = func()97# queue.put(result)98# if __name__ == "__main__":99# run_until_timeout(test, 10)100# worker_process = Process(target=test, daemon=True)101# worker_process.start()102# worker_process.join(5)103# if worker_process.is_alive():104# worker_process.terminate()105# if worker_process.is_alive():106# raise Exception("worker is still alive")...
test_integration_worker_process.py
1import multiprocessing2import time3from unittest import mock, TestCase4from worker_process import WorkerProcess5class WorkerProcessIntegrationTestCase(TestCase):6 def test_succesful_initialization(self):7 try:8 with mock.patch('config.MACHINE') as MockMachine:9 is_initialized = multiprocessing.Value('b', False)10 def mock_initialize_method():11 is_initialized.value = True12 MockMachine.initialize.side_effect = mock_initialize_method13 worker_process = WorkerProcess.create_and_start()14 worker_process.send_message_initialize()15 time.sleep(0.1)16 self.assertTrue(is_initialized.value)17 self.assertEqual(18 worker_process.get_logs(),19 [{'level': 'INFO', 'message': 'Machine initialized successfully'}],20 )21 finally:22 worker_process.kill()23 def test_failed_initialization(self):24 try:25 with mock.patch('config.MACHINE') as MockMachine:26 MockMachine.initialize.side_effect = Exception("Failed to initialize machine")27 worker_process = WorkerProcess.create_and_start()28 worker_process.send_message_initialize()29 time.sleep(0.1)30 self.assertEqual(31 worker_process.get_logs(),32 [{'level': 'ERROR', 'message': 'Failed to initialize machine'}],33 )34 finally:35 worker_process.kill()36 def test_succesful_gcode_sending(self):37 try:38 with mock.patch('config.MACHINE') as MockMachine:39 received_move_coordinates = multiprocessing.Array('c', range(100))40 def mock_move_by_method(x, y, z, feed_rate):41 received_move_coordinates.value = b'x=%f y=%f z=%f feed_rate=%f' % (42 x,43 y,44 z,45 feed_rate,46 )47 type(MockMachine).rapid_move_feed_rate = mock.PropertyMock(return_value=1000)48 MockMachine.move_by.side_effect = mock_move_by_method49 worker_process = WorkerProcess.create_and_start()50 worker_process.send_message_initialize()51 worker_process.send_message_gcode("G0 X3 Y2 Z1")52 time.sleep(0.1)53 self.assertEqual(54 received_move_coordinates.value,55 b'x=3.000000 y=2.000000 z=1.000000 feed_rate=1000.000000')56 logs = worker_process.get_logs()57 self.assertEqual(len(logs), 2)58 self.assertEqual(logs[0]['level'], 'INFO')59 self.assertEqual(logs[0]['message'], 'Machine initialized successfully')60 self.assertEqual(logs[1]['level'], 'INFO')61 self.assertTrue(logs[1]['message'].startswith("gcode interpreted successfully, took"))62 finally:63 worker_process.kill()64 def test_failed_gcode_sending(self):65 try:66 with mock.patch('config.MACHINE'):67 worker_process = WorkerProcess.create_and_start()68 worker_process.send_message_initialize()69 worker_process.send_message_gcode("Invalid")70 time.sleep(0.1)71 self.assertEqual(72 worker_process.get_logs(),73 [74 {'level': 'INFO', 'message': 'Machine initialized successfully'},75 {'level': 'ERROR', 'message': "word 'I' value invalid"},76 ]77 )78 finally:...
Check out the latest blogs from LambdaTest on this topic:
API (Application Programming Interface) is a set of definitions and protocols for building and integrating applications. It’s occasionally referred to as a contract between an information provider and an information user establishing the content required from the consumer and the content needed by the producer.
There is just one area where each member of the software testing community has a distinct point of view! Metrics! This contentious issue sparks intense disputes, and most conversations finish with no definitive conclusion. It covers a wide range of topics: How can testing efforts be measured? What is the most effective technique to assess effectiveness? Which of the many components should be quantified? How can we measure the quality of our testing performance, among other things?
When I started writing tests with Cypress, I was always going to use the user interface to interact and change the application’s state when running tests.
Many theoretical descriptions explain the role of the Scrum Master as a vital member of the Scrum team. However, these descriptions do not provide an honest answer to the fundamental question: “What are the day-to-day activities of a Scrum Master?”
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!