Best Python code snippet using avocado_python
test_status_repo.py
Source:test_status_repo.py
...4 def setUp(self):5 self.status_repo = repo.StatusRepo()6 def test_process_raw_message_invalid(self):7 with self.assertRaises(utils.StatusMsgInvalidJSONError):8 self.status_repo.process_raw_message('+-+-InvalidJSON-AFAICT-+-+')9 def test_process_raw_message_no_id(self):10 msg = ('{"status": "finished", "time": 1597774000.5140226, '11 '"returncode": 0}')12 with self.assertRaises(repo.StatusMsgMissingDataError):13 self.status_repo.process_raw_message(msg)14 def test_set_task_data(self):15 self.status_repo._set_task_data({"id": "1-foo", "status": "started"})16 self.assertEqual(self.status_repo._all_data["1-foo"],17 [{"status": "started"}])18 def test_handle_task_started(self):19 msg = {"id": "1-foo", "status": "started", "output_dir": "/fake/path"}20 self.status_repo._handle_task_started(msg)21 self.assertEqual(self.status_repo.get_task_data("1-foo"),22 [{"status": "started", "output_dir": "/fake/path"}])23 def test_handle_task_started_no_output_dir(self):24 msg = {"id": "1-foo", "status": "started"}25 with self.assertRaises(repo.StatusMsgMissingDataError):26 self.status_repo._handle_task_started(msg)27 def test_handle_task_finished_no_result(self):28 msg = {"id": "1-foo", "status": "finished"}29 self.status_repo._handle_task_finished(msg)30 self.assertEqual(self.status_repo.get_task_data("1-foo"),31 [{"status": "finished"}])32 self.assertEqual(self.status_repo._by_result.get(None), ["1-foo"])33 def test_handle_task_finished_result(self):34 msg = {"id": "1-foo", "status": "finished", "result": "pass"}35 self.status_repo._handle_task_finished(msg)36 self.assertEqual(self.status_repo.get_task_data("1-foo"),37 [{"status": "finished", "result": "pass"}])38 self.assertEqual(self.status_repo._by_result.get("pass"), ["1-foo"])39 def test_process_message_running(self):40 msg = {"id": "1-foo", "status": "running"}41 self.status_repo.process_message(msg)42 self.assertEqual(self.status_repo.get_task_data("1-foo"),43 [{"status": "running"}])44 def test_process_raw_message_task_started(self):45 msg = '{"id": "1-foo", "status": "started", "output_dir": "/fake/path"}'46 self.status_repo.process_raw_message(msg)47 self.assertEqual(self.status_repo.get_task_data("1-foo"),48 [{"status": "started", "output_dir": "/fake/path"}])49 def test_process_raw_message_task_running(self):50 msg = '{"id": "1-foo", "status": "running"}'51 self.status_repo.process_raw_message(msg)52 self.assertEqual(self.status_repo.get_task_data("1-foo"),53 [{"status": "running"}])54 def test_process_messages_running(self):55 msg = {"id": "1-foo", "status": "running", "time": 1597894378.6080744}56 self.status_repo.process_message(msg)57 msg = {"id": "1-foo", "status": "running", "time": 1597894378.6103745}58 self.status_repo.process_message(msg)59 self.assertEqual(self.status_repo.get_latest_task_data("1-foo"),60 {"status": "running", "time": 1597894378.6103745})61 def test_task_status_time(self):62 msg = {"id": "1-foo", "status": "running", "time": 1597894378.0000002}63 self.status_repo.process_message(msg)64 self.assertEqual(self.status_repo._status["1-foo"],65 ("running", 1597894378.0000002))...
websockets.py
Source:websockets.py
...16 # process message normally17 # print("message type: {}".format(msg['e']))18 pprint(msg)19 # do something20def process_raw_message(msg):21 pprint(f"A: {(msg['a'][0])}")22 pprint(f"B: {(msg['b'][0])}")23 # pprint(msg)24# trade data socket25# conn_key_trade = bm.start_trade_socket(SYMBOL, process_m_message)26# depth data socket27# conn_key_depth_partial = bm.start_depth_socket(SYMBOL, process_raw_message, depth=BinanceSocketManager.WEBSOCKET_DEPTH_5)28# conn_key_depth_diff = bm.start_depth_socket(SYMBOL, process_raw_message)29# ì²´ì socket (for í물, ë§ì§)30# conn_key_spot_trade = bm.start_user_socket(process_message)31# conn_key_fut_cross_trade = bm.start_margin_socket(process_message)32# conn_key_fut_iso_trade = bm.start_isolated_margin_socket(SYMBOL, process_message)33# ì²´ê²° socket34# aggtrade : "Get compressed, aggregate trades. Trades that fill at the time, from the same order, with the same price will have the quantity aggregated."...
receiver.py
Source:receiver.py
1import asyncio2import logging3from ..utils import _msg_backend_module, spec_import4logger = logging.getLogger(__name__)5def generic_receiver(opts):6 loop = asyncio.get_event_loop()7 receiver = _msg_backend_module(opts).Receiver()8 receiver.connect(opts['RECEIVE_SOCKET'])9 for processor_spec in opts['--processor']:10 handler_class = spec_import(processor_spec)11 try:12 handler = handler_class()13 process_message = getattr(handler, 'process_message', None)14 process_raw_message = getattr(handler, 'process_raw_message', None)15 assert process_message or process_raw_message16 assert not process_message or callable(process_message)17 assert not process_raw_message or callable(process_raw_message)18 except (TypeError, AssertionError):19 logger.error(f"Error with processor '{processor_spec}'")20 logger.error(21 "Processors must have one or both of 'process_message' and 'process_raw_message' methods"22 )23 return24 if process_message:25 receiver.add_listener(process_message)26 if process_raw_message:27 receiver.add_listener(process_raw_message)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!