Best Python code snippet using molotov_python
protocols_test.py
Source:protocols_test.py
...17 request_packet_mock.is_wrq = lambda: True18 request_packet_mock.is_rrq = lambda: False19 klass = self.protocol.select_protocol(request_packet_mock)20 self.assertTrue(klass == WRQProtocol)21 def test_datagram_received(self):22 data = b'\x00\x01TEST\x00binary\x00'23 mock_loop = MagicMock()24 mock_loop.create_datagram_endpoint.return_value = ('endpoint',)25 mock_loop.create_task.return_value = True26 proto = TFTPServerProtocol('127.0.0.1', mock_loop, {})27 proto.datagram_received(data, ('127.0.0.1', 0,))28 self.assertTrue(mock_loop.create_datagram_endpoint.called)29 mock_loop.create_task.assert_called_with(('endpoint',))30class TestWRQProtocol(t.TestCase):31 @classmethod32 def setUpClass(cls):33 cls.packet_factory = TFTPPacketFactory()34 def setUp(self):35 self.addr = ('127.0.0.1', 9999,)36 self.wrq = WRQ + b'\x00filename1\x00octet\x00'37 self.proto = WRQProtocol(self.wrq, MagicMock, self.addr, {})38 self.proto.set_proto_attributes()39 self.proto.h_timeout = MagicMock()40 self.proto.file_handler = MagicMock()41 self.proto.counter = 1042 self.proto.transport = MagicMock()43 def test_reply_after_write(self):44 data = DAT + b'\x00\x0BAAAA'45 self.proto.datagram_received(data, self.addr)46 self.assertEqual(self.proto.counter, 11)47 self.proto.transport.sendto.assert_called_with(ACK + b'\x00\x0b',48 self.addr)49 def test_close_transport_after_file_finished(self):50 data = DAT + b'\x00\x0BAAAA'51 self.proto.datagram_received(data, self.addr)52 self.assertTrue(self.proto.transport.close.called)53 def test_correct_packet_received_and_saved(self):54 data = DAT + b'\x00\x0BAAAA'55 self.proto.datagram_received(data, self.addr)56 self.proto.file_handler.write_chunk.assert_called_with(b'AAAA')57 def test_bad_tid(self):58 data = DAT + b'\x00\x0BAAAA'59 addr = ('127.0.0.1', 8888,)60 self.proto.datagram_received(data, addr)61 err_tid = self.packet_factory.err_unknown_tid()62 self.proto.transport.sendto.assert_called_with(err_tid.to_bytes(),63 addr)64 def test_bad_packet(self):65 data = ACK + b'\x00\x0C'66 self.proto.datagram_received(data, self.addr)67 self.assertFalse(self.proto.transport.sendto.called)68 def test_bad_packet_sequence_is_ignored(self):69 data = DAT + b'\x00\x0CAAAA'70 self.proto.datagram_received(data, self.addr)71 self.assertFalse(self.proto.transport.sendto.called)72 def test_roll_over(self):73 self.proto.counter = 6553574 dat1 = DAT + b'\xff\xffAAAA'75 dat2 = DAT + b'\x00\x00AAAA'76 self.proto.datagram_received(dat1, self.addr)77 self.assertEqual(self.proto.counter, 65535)78 self.proto.datagram_received(dat2, self.addr)79 self.assertEqual(self.proto.counter, 0)80 self.proto.transport.sendto.assert_called_with(ACK + b'\x00\x00',81 self.addr)82class TestRRQProtocol(t.TestCase):83 @classmethod84 def setUpClass(cls):85 cls.packet_factory = TFTPPacketFactory()86 def setUp(self):87 self.addr = ('127.0.0.1', 9999,)88 self.rrq = RRQ + b'\x00filename1\x00octet\x00'89 self.proto = RRQProtocol(self.rrq, MagicMock, self.addr, {})90 self.proto.set_proto_attributes()91 self.proto.h_timeout = MagicMock()92 self.proto.file_handler = MagicMock()93 self.proto.file_handler.read_chunk = MagicMock(return_value=b'AAAA')94 self.proto.handle_err_pkt = MagicMock()95 self.proto.counter = 1096 self.proto.transport = MagicMock()97 def test_get_next_chunk_of_data(self):98 rsp = self.proto.next_datagram()99 self.assertEqual(rsp.to_bytes(), DAT + b'\x00\x0aAAAA')100 def test_get_sequence_of_chunks(self):101 self.proto.file_handler.finished = False102 ack1 = ACK + b'\x00\x0a'103 ack2 = ACK + b'\x00\x0b'104 dat1 = DAT + b'\x00\x0bAAAA'105 dat2 = DAT + b'\x00\x0cAAAA'106 self.proto.datagram_received(ack1, self.addr)107 self.proto.datagram_received(ack2, self.addr)108 calls = [call(dat1, self.addr), call(dat2, self.addr)]109 self.proto.transport.sendto.assert_has_calls(calls)110 def test_get_next_window_of_data(self):111 self.proto.file_handler.finished = False112 self.proto.opts[b'windowsize'] = 2113 self.proto.packets = [None] * 2114 ack1 = ACK + b'\x00\x0a'115 dat1 = DAT + b'\x00\x0bAAAA'116 dat2 = DAT + b'\x00\x0cAAAA'117 self.proto.datagram_received(ack1, self.addr)118 calls = [call(dat1, self.addr), call(dat2, self.addr)]119 self.proto.transport.sendto.assert_has_calls(calls)120 def test_get_sequence_of_windows(self):121 self.proto.file_handler.finished = False122 self.proto.opts[b'windowsize'] = 2123 self.proto.packets = [None] * 2124 ack1 = ACK + b'\x00\x0a'125 ack2 = ACK + b'\x00\x0c'126 dat1 = DAT + b'\x00\x0bAAAA'127 dat2 = DAT + b'\x00\x0cAAAA'128 dat3 = DAT + b'\x00\x0dAAAA'129 dat4 = DAT + b'\x00\x0eAAAA'130 self.proto.datagram_received(ack1, self.addr)131 self.proto.datagram_received(ack2, self.addr)132 calls = [call(dat1, self.addr), call(dat2, self.addr),133 call(dat3, self.addr), call(dat4, self.addr)]134 self.proto.transport.sendto.assert_has_calls(calls)135 def test_send_last_packet(self):136 self.proto.file_handler.read_chunk = MagicMock(return_value=b'AA')137 self.proto.file_handler.finished = False138 ack1 = ACK + b'\x00\x0a'139 ack2 = ACK + b'\x00\x0b'140 self.proto.datagram_received(ack1, self.addr)141 self.proto.transport.sendto.assert_called_with(DAT + b'\x00\x0bAA',142 self.addr)143 self.proto.file_handler.finished = True144 self.proto.datagram_received(ack2, self.addr)145 self.assertTrue(self.proto.transport.close.called)146 def test_bad_packet(self):147 bad_msg = DAT + b'\x00\x0aAAAA'148 self.proto.datagram_received(bad_msg, self.addr)149 self.assertFalse(self.proto.transport.sendto.called)150 def test_bad_tid(self):151 # this should get moved to base tests, same for wrq152 addr = ('127.0.0.1', 8888,)153 ack1 = ACK + b'\x00\x0a'154 self.proto.datagram_received(ack1, addr)155 err_tid = self.packet_factory.err_unknown_tid()156 self.proto.transport.sendto.assert_called_with(err_tid.to_bytes(),157 addr)158 def test_bad_packet_sequence_is_ignored(self):159 ack1 = ACK + b'\x00\x0b'160 self.proto.datagram_received(ack1, self.addr)161 self.assertFalse(self.proto.transport.sendto.called)162 self.assertFalse(self.proto.file_handler.send.called)163 def test_bad_packet_sequence_starts_new_window(self):164 self.proto.file_handler.finished = False165 self.proto.opts[b'windowsize'] = 2166 self.proto.packets = [None] * 2167 ack1 = ACK + b'\x00\x0a'168 ack2 = ACK + b'\x00\x0b' # ACK of block_no within window169 dat1 = DAT + b'\x00\x0bAAAA'170 dat2 = DAT + b'\x00\x0cAAAA'171 dat3 = DAT + b'\x00\x0dAAAA'172 # After ACK of block_no \x0a, block_no \x0b and \x0c are sent173 self.proto.datagram_received(ack1, self.addr)174 # After ACK of block_no \x0b, block_no \x0c and \x0d are sent175 self.proto.datagram_received(ack2, self.addr)176 calls = [call(dat1, self.addr), call(dat2, self.addr),177 call(dat2, self.addr), call(dat3, self.addr)]178 self.proto.transport.sendto.assert_has_calls(calls)179 def test_roll_over(self):180 self.proto.file_handler.finished = False181 self.proto.counter = (2 ** 16) - 1182 ack1 = ACK + b'\xff\xff'183 ack2 = ACK + b'\x00\x00'184 dat1 = DAT + b'\x00\x00AAAA'185 dat2 = DAT + b'\x00\x01AAAA'186 self.proto.datagram_received(ack1, self.addr)187 self.proto.datagram_received(ack2, self.addr)188 calls = [call(dat1, self.addr), call(dat2, self.addr)]189 self.proto.transport.sendto.assert_has_calls(calls)190 def test_err_received(self):191 err = ERR + b'\x00TFTP Aborted.\x00'192 self.proto.datagram_received(err, self.addr)193 self.assertTrue(self.proto.handle_err_pkt.called)194 self.assertFalse(self.proto.transport.sendto.called)195 def test_err_received_windowsize(self):196 err = ERR + b'\x00TFTP Aborted.\x00'197 self.proto.opts[b'windowsize'] = 2198 self.proto.datagram_received(err, self.addr)199 self.assertTrue(self.proto.handle_err_pkt.called)...
test_bpup.py
Source:test_bpup.py
...46 bpup_protocol.connection_made(transport)47 assert transport.sendto.mock_calls == [call(b"\n")]48 transport.sendto.reset_mock()49 assert bpup_subscriptions.alive is False50 bpup_protocol.datagram_received(51 b'{"B":"KNKSADE42149","d":0,"v":"v2.29.2-beta"}\n', MOCK_ADDR52 )53 assert bpup_subscriptions.alive is True54 mock_time_changed(loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=60))55 assert transport.sendto.mock_calls == [call(b"\n")]56 transport.sendto.reset_mock()57 mock_protocol_connection_lost(bpup_protocol, None)58 assert "BPUP connection lost" not in caplog.text59 assert bpup_subscriptions.alive is False60 mock_time_changed(61 loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=120)62 )63 assert transport.sendto.mock_calls == []64@pytest.mark.asyncio65async def test_protocol_keep_connection_lost_with_error(transport, caplog):66 bpup_subscriptions = BPUPSubscriptions()67 loop = asyncio.get_event_loop()68 bpup_protocol = BPUProtocol(bpup_subscriptions)69 bpup_protocol.connection_made(transport)70 assert transport.sendto.mock_calls == [call(b"\n")]71 transport.sendto.reset_mock()72 assert bpup_subscriptions.alive is False73 bpup_protocol.datagram_received(74 b'{"B":"KNKSADE42149","d":0,"v":"v2.29.2-beta"}\n', MOCK_ADDR75 )76 mock_time_changed(loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=60))77 assert transport.sendto.mock_calls == [call(b"\n")]78 transport.sendto.reset_mock()79 assert bpup_subscriptions.alive is True80 mock_protocol_connection_lost(bpup_protocol, OSError())81 assert "BPUP connection lost" in caplog.text82 assert bpup_subscriptions.alive is False83 assert transport.sendto.mock_calls == []84 mock_time_changed(85 loop, dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=120)86 )87 assert transport.sendto.mock_calls == []88@pytest.mark.asyncio89async def test_protocol_subscriptions(transport, caplog):90 bpup_subscriptions = BPUPSubscriptions()91 bpup_protocol = BPUProtocol(bpup_subscriptions)92 last_msg = None93 def _on_new_message(msg):94 nonlocal last_msg95 last_msg = msg96 bpup_subscriptions.subscribe("1", _on_new_message)97 bpup_protocol.connection_made(transport)98 # Make sure we can do it again99 bpup_protocol.connection_made(transport)100 bpup_protocol.datagram_received(101 b'{"B":"KNKSADE42149","d":0,"v":"v2.29.2-beta"}\n', MOCK_ADDR102 )103 bpup_protocol.datagram_received(104 b'{"t":"devices/1/state","s":200,"b":{"power":1,"speed":1,"timer":0,"breeze":[0,50,50],"_":"690b6aff"}}\n',105 MOCK_ADDR,106 )107 assert last_msg == {108 "_": "690b6aff",109 "breeze": [0, 50, 50],110 "power": 1,111 "speed": 1,112 "timer": 0,113 }114 bpup_protocol.datagram_received(115 b'{"t":"devices/1/state","s":200,"b":{"power":1,"speed":1,"timer":0,"breeze":[0,50,50],"_":"690b6aff"}}\n',116 MOCK_ADDR,117 )118 # 500 error should not trigger a new message119 assert last_msg == {120 "_": "690b6aff",121 "breeze": [0, 50, 50],122 "power": 1,123 "speed": 1,124 "timer": 0,125 }126 last_msg = {}127 bpup_subscriptions.unsubscribe("1", _on_new_message)128 bpup_protocol.datagram_received(129 b'{"t":"devices/1/state","s":200,"b":{"power":1,"speed":1,"timer":0,"breeze":[0,50,50],"_":"690b6aff"}}\n',130 MOCK_ADDR,131 )132 assert last_msg == {}133 bpup_protocol.datagram_received(134 b'{"B":"KVPRBDGXXXXX","_error_id":633,"_error_msg":"BPUP client timeout"}',135 MOCK_ADDR,136 )137 assert last_msg == {}138 bpup_protocol.datagram_received(139 b"GIGO",140 MOCK_ADDR,141 )142 assert "Failed to process BPUP message" in caplog.text143 assert "GIGO" in caplog.text144@pytest.mark.asyncio145async def test_protocol_errors(transport, caplog):146 bpup_subscriptions = BPUPSubscriptions()147 bpup_protocol = BPUProtocol(bpup_subscriptions)148 bpup_protocol.connection_made(transport)149 bpup_protocol.error_received(OSError())150 assert "BPUP error" in caplog.text151@pytest.mark.asyncio152async def test_start_bpup(transport):...
test_statsd.py
Source:test_statsd.py
...46 def _test_gauge_or_ms(self, metric_type, utcnow):47 metric_name = "test_gauge_or_ms"48 metric_key = metric_name + "|" + metric_type49 utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36)50 self.server.datagram_received(51 ("%s:1|%s" % (metric_name, metric_type)).encode('ascii'),52 ("127.0.0.1", 12345))53 self.stats.flush()54 r = self.stats.indexer.get_resource('generic',55 self.conf.statsd.resource_id,56 with_metrics=True)57 metric = r.get_metric(metric_key)58 self.stats.storage.process_background_tasks(59 self.stats.indexer, sync=True)60 measures = self.stats.storage.get_measures(metric)61 self.assertEqual([62 (utils.datetime_utc(2015, 1, 7), 86400.0, 1.0),63 (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.0),64 (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)65 ], measures)66 utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37)67 # This one is going to be ignored68 self.server.datagram_received(69 ("%s:45|%s" % (metric_name, metric_type)).encode('ascii'),70 ("127.0.0.1", 12345))71 self.server.datagram_received(72 ("%s:2|%s" % (metric_name, metric_type)).encode('ascii'),73 ("127.0.0.1", 12345))74 self.stats.flush()75 self.stats.storage.process_background_tasks(76 self.stats.indexer, sync=True)77 measures = self.stats.storage.get_measures(metric)78 self.assertEqual([79 (utils.datetime_utc(2015, 1, 7), 86400.0, 1.5),80 (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.5),81 (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0),82 (utils.datetime_utc(2015, 1, 7, 13, 59), 60.0, 2.0)83 ], measures)84 def test_gauge(self):85 self._test_gauge_or_ms("g")86 def test_ms(self):87 self._test_gauge_or_ms("ms")88 @mock.patch.object(timeutils, 'utcnow')89 def test_counter(self, utcnow):90 metric_name = "test_counter"91 metric_key = metric_name + "|c"92 utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 58, 36)93 self.server.datagram_received(94 ("%s:1|c" % metric_name).encode('ascii'),95 ("127.0.0.1", 12345))96 self.stats.flush()97 r = self.stats.indexer.get_resource('generic',98 self.conf.statsd.resource_id,99 with_metrics=True)100 metric = r.get_metric(metric_key)101 self.assertIsNotNone(metric)102 self.stats.storage.process_background_tasks(103 self.stats.indexer, sync=True)104 measures = self.stats.storage.get_measures(metric)105 self.assertEqual([106 (utils.datetime_utc(2015, 1, 7), 86400.0, 1.0),107 (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 1.0),108 (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0)], measures)109 utcnow.return_value = datetime.datetime(2015, 1, 7, 13, 59, 37)110 self.server.datagram_received(111 ("%s:45|c" % metric_name).encode('ascii'),112 ("127.0.0.1", 12345))113 self.server.datagram_received(114 ("%s:2|c|@0.2" % metric_name).encode('ascii'),115 ("127.0.0.1", 12345))116 self.stats.flush()117 self.stats.storage.process_background_tasks(118 self.stats.indexer, sync=True)119 measures = self.stats.storage.get_measures(metric)120 self.assertEqual([121 (utils.datetime_utc(2015, 1, 7), 86400.0, 28),122 (utils.datetime_utc(2015, 1, 7, 13), 3600.0, 28),123 (utils.datetime_utc(2015, 1, 7, 13, 58), 60.0, 1.0),124 (utils.datetime_utc(2015, 1, 7, 13, 59), 60.0, 55.0)], measures)125class TestStatsdArchivePolicyRule(TestStatsd):126 STATSD_ARCHIVE_POLICY_NAME = ""127 def setUp(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!