Best Python code snippet using molotov_python
test_streams.py
Source:test_streams.py
...100 loop=self.loop)101 self._basetest_open_connection_error(conn_fut)102 def test_feed_empty_data(self):103 stream = asyncio.StreamReader(loop=self.loop)104 stream.feed_data(b'')105 self.assertEqual(b'', stream._buffer)106 def test_feed_nonempty_data(self):107 stream = asyncio.StreamReader(loop=self.loop)108 stream.feed_data(self.DATA)109 self.assertEqual(self.DATA, stream._buffer)110 def test_read_zero(self):111 # Read zero bytes.112 stream = asyncio.StreamReader(loop=self.loop)113 stream.feed_data(self.DATA)114 data = self.loop.run_until_complete(stream.read(0))115 self.assertEqual(b'', data)116 self.assertEqual(self.DATA, stream._buffer)117 def test_read(self):118 # Read bytes.119 stream = asyncio.StreamReader(loop=self.loop)120 read_task = asyncio.Task(stream.read(30), loop=self.loop)121 def cb():122 stream.feed_data(self.DATA)123 self.loop.call_soon(cb)124 data = self.loop.run_until_complete(read_task)125 self.assertEqual(self.DATA, data)126 self.assertEqual(b'', stream._buffer)127 def test_read_line_breaks(self):128 # Read bytes without line breaks.129 stream = asyncio.StreamReader(loop=self.loop)130 stream.feed_data(b'line1')131 stream.feed_data(b'line2')132 data = self.loop.run_until_complete(stream.read(5))133 self.assertEqual(b'line1', data)134 self.assertEqual(b'line2', stream._buffer)135 def test_read_eof(self):136 # Read bytes, stop at eof.137 stream = asyncio.StreamReader(loop=self.loop)138 read_task = asyncio.Task(stream.read(1024), loop=self.loop)139 def cb():140 stream.feed_eof()141 self.loop.call_soon(cb)142 data = self.loop.run_until_complete(read_task)143 self.assertEqual(b'', data)144 self.assertEqual(b'', stream._buffer)145 def test_read_until_eof(self):146 # Read all bytes until eof.147 stream = asyncio.StreamReader(loop=self.loop)148 read_task = asyncio.Task(stream.read(-1), loop=self.loop)149 def cb():150 stream.feed_data(b'chunk1\n')151 stream.feed_data(b'chunk2')152 stream.feed_eof()153 self.loop.call_soon(cb)154 data = self.loop.run_until_complete(read_task)155 self.assertEqual(b'chunk1\nchunk2', data)156 self.assertEqual(b'', stream._buffer)157 def test_read_exception(self):158 stream = asyncio.StreamReader(loop=self.loop)159 stream.feed_data(b'line\n')160 data = self.loop.run_until_complete(stream.read(2))161 self.assertEqual(b'li', data)162 stream.set_exception(ValueError())163 self.assertRaises(164 ValueError, self.loop.run_until_complete, stream.read(2))165 def test_invalid_limit(self):166 with self.assertRaisesRegex(ValueError, 'imit'):167 asyncio.StreamReader(limit=0, loop=self.loop)168 with self.assertRaisesRegex(ValueError, 'imit'):169 asyncio.StreamReader(limit=-1, loop=self.loop)170 def test_read_limit(self):171 stream = asyncio.StreamReader(limit=3, loop=self.loop)172 stream.feed_data(b'chunk')173 data = self.loop.run_until_complete(stream.read(5))174 self.assertEqual(b'chunk', data)175 self.assertEqual(b'', stream._buffer)176 def test_readline(self):177 # Read one line. 'readline' will need to wait for the data178 # to come from 'cb'179 stream = asyncio.StreamReader(loop=self.loop)180 stream.feed_data(b'chunk1 ')181 read_task = asyncio.Task(stream.readline(), loop=self.loop)182 def cb():183 stream.feed_data(b'chunk2 ')184 stream.feed_data(b'chunk3 ')185 stream.feed_data(b'\n chunk4')186 self.loop.call_soon(cb)187 line = self.loop.run_until_complete(read_task)188 self.assertEqual(b'chunk1 chunk2 chunk3 \n', line)189 self.assertEqual(b' chunk4', stream._buffer)190 def test_readline_limit_with_existing_data(self):191 # Read one line. The data is in StreamReader's buffer192 # before the event loop is run.193 stream = asyncio.StreamReader(limit=3, loop=self.loop)194 stream.feed_data(b'li')195 stream.feed_data(b'ne1\nline2\n')196 self.assertRaises(197 ValueError, self.loop.run_until_complete, stream.readline())198 # The buffer should contain the remaining data after exception199 self.assertEqual(b'line2\n', stream._buffer)200 stream = asyncio.StreamReader(limit=3, loop=self.loop)201 stream.feed_data(b'li')202 stream.feed_data(b'ne1')203 stream.feed_data(b'li')204 self.assertRaises(205 ValueError, self.loop.run_until_complete, stream.readline())206 # No b'\n' at the end. The 'limit' is set to 3. So before207 # waiting for the new data in buffer, 'readline' will consume208 # the entire buffer, and since the length of the consumed data209 # is more than 3, it will raise a ValueError. The buffer is210 # expected to be empty now.211 self.assertEqual(b'', stream._buffer)212 def test_at_eof(self):213 stream = asyncio.StreamReader(loop=self.loop)214 self.assertFalse(stream.at_eof())215 stream.feed_data(b'some data\n')216 self.assertFalse(stream.at_eof())217 self.loop.run_until_complete(stream.readline())218 self.assertFalse(stream.at_eof())219 stream.feed_data(b'some data\n')220 stream.feed_eof()221 self.loop.run_until_complete(stream.readline())222 self.assertTrue(stream.at_eof())223 def test_readline_limit(self):224 # Read one line. StreamReaders are fed with data after225 # their 'readline' methods are called.226 stream = asyncio.StreamReader(limit=7, loop=self.loop)227 def cb():228 stream.feed_data(b'chunk1')229 stream.feed_data(b'chunk2')230 stream.feed_data(b'chunk3\n')231 stream.feed_eof()232 self.loop.call_soon(cb)233 self.assertRaises(234 ValueError, self.loop.run_until_complete, stream.readline())235 # The buffer had just one line of data, and after raising236 # a ValueError it should be empty.237 self.assertEqual(b'', stream._buffer)238 stream = asyncio.StreamReader(limit=7, loop=self.loop)239 def cb():240 stream.feed_data(b'chunk1')241 stream.feed_data(b'chunk2\n')242 stream.feed_data(b'chunk3\n')243 stream.feed_eof()244 self.loop.call_soon(cb)245 self.assertRaises(246 ValueError, self.loop.run_until_complete, stream.readline())247 self.assertEqual(b'chunk3\n', stream._buffer)248 # check strictness of the limit249 stream = asyncio.StreamReader(limit=7, loop=self.loop)250 stream.feed_data(b'1234567\n')251 line = self.loop.run_until_complete(stream.readline())252 self.assertEqual(b'1234567\n', line)253 self.assertEqual(b'', stream._buffer)254 stream.feed_data(b'12345678\n')255 with self.assertRaises(ValueError) as cm:256 self.loop.run_until_complete(stream.readline())257 self.assertEqual(b'', stream._buffer)258 stream.feed_data(b'12345678')259 with self.assertRaises(ValueError) as cm:260 self.loop.run_until_complete(stream.readline())261 self.assertEqual(b'', stream._buffer)262 def test_readline_nolimit_nowait(self):263 # All needed data for the first 'readline' call will be264 # in the buffer.265 stream = asyncio.StreamReader(loop=self.loop)266 stream.feed_data(self.DATA[:6])267 stream.feed_data(self.DATA[6:])268 line = self.loop.run_until_complete(stream.readline())269 self.assertEqual(b'line1\n', line)270 self.assertEqual(b'line2\nline3\n', stream._buffer)271 def test_readline_eof(self):272 stream = asyncio.StreamReader(loop=self.loop)273 stream.feed_data(b'some data')274 stream.feed_eof()275 line = self.loop.run_until_complete(stream.readline())276 self.assertEqual(b'some data', line)277 def test_readline_empty_eof(self):278 stream = asyncio.StreamReader(loop=self.loop)279 stream.feed_eof()280 line = self.loop.run_until_complete(stream.readline())281 self.assertEqual(b'', line)282 def test_readline_read_byte_count(self):283 stream = asyncio.StreamReader(loop=self.loop)284 stream.feed_data(self.DATA)285 self.loop.run_until_complete(stream.readline())286 data = self.loop.run_until_complete(stream.read(7))287 self.assertEqual(b'line2\nl', data)288 self.assertEqual(b'ine3\n', stream._buffer)289 def test_readline_exception(self):290 stream = asyncio.StreamReader(loop=self.loop)291 stream.feed_data(b'line\n')292 data = self.loop.run_until_complete(stream.readline())293 self.assertEqual(b'line\n', data)294 stream.set_exception(ValueError())295 self.assertRaises(296 ValueError, self.loop.run_until_complete, stream.readline())297 self.assertEqual(b'', stream._buffer)298 def test_readuntil_separator(self):299 stream = asyncio.StreamReader(loop=self.loop)300 with self.assertRaisesRegex(ValueError, 'Separator should be'):301 self.loop.run_until_complete(stream.readuntil(separator=b''))302 def test_readuntil_multi_chunks(self):303 stream = asyncio.StreamReader(loop=self.loop)304 stream.feed_data(b'lineAAA')305 data = self.loop.run_until_complete(stream.readuntil(separator=b'AAA'))306 self.assertEqual(b'lineAAA', data)307 self.assertEqual(b'', stream._buffer)308 stream.feed_data(b'lineAAA')309 data = self.loop.run_until_complete(stream.readuntil(b'AAA'))310 self.assertEqual(b'lineAAA', data)311 self.assertEqual(b'', stream._buffer)312 stream.feed_data(b'lineAAAxxx')313 data = self.loop.run_until_complete(stream.readuntil(b'AAA'))314 self.assertEqual(b'lineAAA', data)315 self.assertEqual(b'xxx', stream._buffer)316 def test_readuntil_multi_chunks_1(self):317 stream = asyncio.StreamReader(loop=self.loop)318 stream.feed_data(b'QWEaa')319 stream.feed_data(b'XYaa')320 stream.feed_data(b'a')321 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))322 self.assertEqual(b'QWEaaXYaaa', data)323 self.assertEqual(b'', stream._buffer)324 stream.feed_data(b'QWEaa')325 stream.feed_data(b'XYa')326 stream.feed_data(b'aa')327 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))328 self.assertEqual(b'QWEaaXYaaa', data)329 self.assertEqual(b'', stream._buffer)330 stream.feed_data(b'aaa')331 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))332 self.assertEqual(b'aaa', data)333 self.assertEqual(b'', stream._buffer)334 stream.feed_data(b'Xaaa')335 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))336 self.assertEqual(b'Xaaa', data)337 self.assertEqual(b'', stream._buffer)338 stream.feed_data(b'XXX')339 stream.feed_data(b'a')340 stream.feed_data(b'a')341 stream.feed_data(b'a')342 data = self.loop.run_until_complete(stream.readuntil(b'aaa'))343 self.assertEqual(b'XXXaaa', data)344 self.assertEqual(b'', stream._buffer)345 def test_readuntil_eof(self):346 stream = asyncio.StreamReader(loop=self.loop)347 stream.feed_data(b'some dataAA')348 stream.feed_eof()349 with self.assertRaises(asyncio.IncompleteReadError) as cm:350 self.loop.run_until_complete(stream.readuntil(b'AAA'))351 self.assertEqual(cm.exception.partial, b'some dataAA')352 self.assertIsNone(cm.exception.expected)353 self.assertEqual(b'', stream._buffer)354 def test_readuntil_limit_found_sep(self):355 stream = asyncio.StreamReader(loop=self.loop, limit=3)356 stream.feed_data(b'some dataAA')357 with self.assertRaisesRegex(asyncio.LimitOverrunError,358 'not found') as cm:359 self.loop.run_until_complete(stream.readuntil(b'AAA'))360 self.assertEqual(b'some dataAA', stream._buffer)361 stream.feed_data(b'A')362 with self.assertRaisesRegex(asyncio.LimitOverrunError,363 'is found') as cm:364 self.loop.run_until_complete(stream.readuntil(b'AAA'))365 self.assertEqual(b'some dataAAA', stream._buffer)366 def test_readexactly_zero_or_less(self):367 # Read exact number of bytes (zero or less).368 stream = asyncio.StreamReader(loop=self.loop)369 stream.feed_data(self.DATA)370 data = self.loop.run_until_complete(stream.readexactly(0))371 self.assertEqual(b'', data)372 self.assertEqual(self.DATA, stream._buffer)373 with self.assertRaisesRegex(ValueError, 'less than zero'):374 self.loop.run_until_complete(stream.readexactly(-1))375 self.assertEqual(self.DATA, stream._buffer)376 def test_readexactly(self):377 # Read exact number of bytes.378 stream = asyncio.StreamReader(loop=self.loop)379 n = 2 * len(self.DATA)380 read_task = asyncio.Task(stream.readexactly(n), loop=self.loop)381 def cb():382 stream.feed_data(self.DATA)383 stream.feed_data(self.DATA)384 stream.feed_data(self.DATA)385 self.loop.call_soon(cb)386 data = self.loop.run_until_complete(read_task)387 self.assertEqual(self.DATA + self.DATA, data)388 self.assertEqual(self.DATA, stream._buffer)389 def test_readexactly_limit(self):390 stream = asyncio.StreamReader(limit=3, loop=self.loop)391 stream.feed_data(b'chunk')392 data = self.loop.run_until_complete(stream.readexactly(5))393 self.assertEqual(b'chunk', data)394 self.assertEqual(b'', stream._buffer)395 def test_readexactly_eof(self):396 # Read exact number of bytes (eof).397 stream = asyncio.StreamReader(loop=self.loop)398 n = 2 * len(self.DATA)399 read_task = asyncio.Task(stream.readexactly(n), loop=self.loop)400 def cb():401 stream.feed_data(self.DATA)402 stream.feed_eof()403 self.loop.call_soon(cb)404 with self.assertRaises(asyncio.IncompleteReadError) as cm:405 self.loop.run_until_complete(read_task)406 self.assertEqual(cm.exception.partial, self.DATA)407 self.assertEqual(cm.exception.expected, n)408 self.assertEqual(str(cm.exception),409 '18 bytes read on a total of 36 expected bytes')410 self.assertEqual(b'', stream._buffer)411 def test_readexactly_exception(self):412 stream = asyncio.StreamReader(loop=self.loop)413 stream.feed_data(b'line\n')414 data = self.loop.run_until_complete(stream.readexactly(2))415 self.assertEqual(b'li', data)416 stream.set_exception(ValueError())417 self.assertRaises(418 ValueError, self.loop.run_until_complete, stream.readexactly(2))419 def test_exception(self):420 stream = asyncio.StreamReader(loop=self.loop)421 self.assertIsNone(stream.exception())422 exc = ValueError()423 stream.set_exception(exc)424 self.assertIs(stream.exception(), exc)425 def test_exception_waiter(self):426 stream = asyncio.StreamReader(loop=self.loop)427 @asyncio.coroutine428 def set_err():429 stream.set_exception(ValueError())430 t1 = asyncio.Task(stream.readline(), loop=self.loop)431 t2 = asyncio.Task(set_err(), loop=self.loop)432 self.loop.run_until_complete(asyncio.wait([t1, t2], loop=self.loop))433 self.assertRaises(ValueError, t1.result)434 def test_exception_cancel(self):435 stream = asyncio.StreamReader(loop=self.loop)436 t = asyncio.Task(stream.readline(), loop=self.loop)437 test_utils.run_briefly(self.loop)438 t.cancel()439 test_utils.run_briefly(self.loop)440 # The following line fails if set_exception() isn't careful.441 stream.set_exception(RuntimeError('message'))442 test_utils.run_briefly(self.loop)443 self.assertIs(stream._waiter, None)444 def test_start_server(self):445 class MyServer:446 def __init__(self, loop):447 self.server = None448 self.loop = loop449 async def handle_client(self, client_reader, client_writer):450 data = await client_reader.readline()451 client_writer.write(data)452 await client_writer.drain()453 client_writer.close()454 def start(self):455 sock = socket.socket()456 sock.bind(('127.0.0.1', 0))457 self.server = self.loop.run_until_complete(458 asyncio.start_server(self.handle_client,459 sock=sock,460 loop=self.loop))461 return sock.getsockname()462 def handle_client_callback(self, client_reader, client_writer):463 self.loop.create_task(self.handle_client(client_reader,464 client_writer))465 def start_callback(self):466 sock = socket.socket()467 sock.bind(('127.0.0.1', 0))468 addr = sock.getsockname()469 sock.close()470 self.server = self.loop.run_until_complete(471 asyncio.start_server(self.handle_client_callback,472 host=addr[0], port=addr[1],473 loop=self.loop))474 return addr475 def stop(self):476 if self.server is not None:477 self.server.close()478 self.loop.run_until_complete(self.server.wait_closed())479 self.server = None480 async def client(addr):481 reader, writer = await asyncio.open_connection(482 *addr, loop=self.loop)483 # send a line484 writer.write(b"hello world!\n")485 # read it back486 msgback = await reader.readline()487 writer.close()488 return msgback489 # test the server variant with a coroutine as client handler490 server = MyServer(self.loop)491 addr = server.start()492 msg = self.loop.run_until_complete(asyncio.Task(client(addr),493 loop=self.loop))494 server.stop()495 self.assertEqual(msg, b"hello world!\n")496 # test the server variant with a callback as client handler497 server = MyServer(self.loop)498 addr = server.start_callback()499 msg = self.loop.run_until_complete(asyncio.Task(client(addr),500 loop=self.loop))501 server.stop()502 self.assertEqual(msg, b"hello world!\n")503 @support.skip_unless_bind_unix_socket504 def test_start_unix_server(self):505 class MyServer:506 def __init__(self, loop, path):507 self.server = None508 self.loop = loop509 self.path = path510 async def handle_client(self, client_reader, client_writer):511 data = await client_reader.readline()512 client_writer.write(data)513 await client_writer.drain()514 client_writer.close()515 def start(self):516 self.server = self.loop.run_until_complete(517 asyncio.start_unix_server(self.handle_client,518 path=self.path,519 loop=self.loop))520 def handle_client_callback(self, client_reader, client_writer):521 self.loop.create_task(self.handle_client(client_reader,522 client_writer))523 def start_callback(self):524 start = asyncio.start_unix_server(self.handle_client_callback,525 path=self.path,526 loop=self.loop)527 self.server = self.loop.run_until_complete(start)528 def stop(self):529 if self.server is not None:530 self.server.close()531 self.loop.run_until_complete(self.server.wait_closed())532 self.server = None533 async def client(path):534 reader, writer = await asyncio.open_unix_connection(535 path, loop=self.loop)536 # send a line537 writer.write(b"hello world!\n")538 # read it back539 msgback = await reader.readline()540 writer.close()541 return msgback542 # test the server variant with a coroutine as client handler543 with test_utils.unix_socket_path() as path:544 server = MyServer(self.loop, path)545 server.start()546 msg = self.loop.run_until_complete(asyncio.Task(client(path),547 loop=self.loop))548 server.stop()549 self.assertEqual(msg, b"hello world!\n")550 # test the server variant with a callback as client handler551 with test_utils.unix_socket_path() as path:552 server = MyServer(self.loop, path)553 server.start_callback()554 msg = self.loop.run_until_complete(asyncio.Task(client(path),555 loop=self.loop))556 server.stop()557 self.assertEqual(msg, b"hello world!\n")558 @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")559 def test_read_all_from_pipe_reader(self):560 # See asyncio issue 168. This test is derived from the example561 # subprocess_attach_read_pipe.py, but we configure the562 # StreamReader's limit so that twice it is less than the size563 # of the data writter. Also we must explicitly attach a child564 # watcher to the event loop.565 code = """\566import os, sys567fd = int(sys.argv[1])568os.write(fd, b'data')569os.close(fd)570"""571 rfd, wfd = os.pipe()572 args = [sys.executable, '-c', code, str(wfd)]573 pipe = open(rfd, 'rb', 0)574 reader = asyncio.StreamReader(loop=self.loop, limit=1)575 protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)576 transport, _ = self.loop.run_until_complete(577 self.loop.connect_read_pipe(lambda: protocol, pipe))578 watcher = asyncio.SafeChildWatcher()579 watcher.attach_loop(self.loop)580 try:581 asyncio.set_child_watcher(watcher)582 create = asyncio.create_subprocess_exec(*args,583 pass_fds={wfd},584 loop=self.loop)585 proc = self.loop.run_until_complete(create)586 self.loop.run_until_complete(proc.wait())587 finally:588 asyncio.set_child_watcher(None)589 os.close(wfd)590 data = self.loop.run_until_complete(reader.read(-1))591 self.assertEqual(data, b'data')592 def test_streamreader_constructor(self):593 self.addCleanup(asyncio.set_event_loop, None)594 asyncio.set_event_loop(self.loop)595 # asyncio issue #184: Ensure that StreamReaderProtocol constructor596 # retrieves the current loop if the loop parameter is not set597 reader = asyncio.StreamReader()598 self.assertIs(reader._loop, self.loop)599 def test_streamreaderprotocol_constructor(self):600 self.addCleanup(asyncio.set_event_loop, None)601 asyncio.set_event_loop(self.loop)602 # asyncio issue #184: Ensure that StreamReaderProtocol constructor603 # retrieves the current loop if the loop parameter is not set604 reader = mock.Mock()605 protocol = asyncio.StreamReaderProtocol(reader)606 self.assertIs(protocol._loop, self.loop)607 def test_drain_raises(self):608 # See http://bugs.python.org/issue25441609 # This test should not use asyncio for the mock server; the610 # whole point of the test is to test for a bug in drain()611 # where it never gives up the event loop but the socket is612 # closed on the server side.613 q = queue.Queue()614 def server():615 # Runs in a separate thread.616 sock = socket.socket()617 with sock:618 sock.bind(('localhost', 0))619 sock.listen(1)620 addr = sock.getsockname()621 q.put(addr)622 clt, _ = sock.accept()623 clt.close()624 async def client(host, port):625 reader, writer = await asyncio.open_connection(626 host, port, loop=self.loop)627 while True:628 writer.write(b"foo\n")629 await writer.drain()630 # Start the server thread and wait for it to be listening.631 thread = threading.Thread(target=server)632 thread.setDaemon(True)633 thread.start()634 addr = q.get()635 # Should not be stuck in an infinite loop.636 with self.assertRaises((ConnectionResetError, BrokenPipeError)):637 self.loop.run_until_complete(client(*addr))638 # Clean up the thread. (Only on success; on failure, it may639 # be stuck in accept().)640 thread.join()641 def test___repr__(self):642 stream = asyncio.StreamReader(loop=self.loop)643 self.assertEqual("<StreamReader>", repr(stream))644 def test___repr__nondefault_limit(self):645 stream = asyncio.StreamReader(loop=self.loop, limit=123)646 self.assertEqual("<StreamReader limit=123>", repr(stream))647 def test___repr__eof(self):648 stream = asyncio.StreamReader(loop=self.loop)649 stream.feed_eof()650 self.assertEqual("<StreamReader eof>", repr(stream))651 def test___repr__data(self):652 stream = asyncio.StreamReader(loop=self.loop)653 stream.feed_data(b'data')654 self.assertEqual("<StreamReader 4 bytes>", repr(stream))655 def test___repr__exception(self):656 stream = asyncio.StreamReader(loop=self.loop)657 exc = RuntimeError()658 stream.set_exception(exc)659 self.assertEqual("<StreamReader exception=RuntimeError()>",660 repr(stream))661 def test___repr__waiter(self):662 stream = asyncio.StreamReader(loop=self.loop)663 stream._waiter = asyncio.Future(loop=self.loop)664 self.assertRegex(665 repr(stream),666 r"<StreamReader waiter=<Future pending[\S ]*>>")667 stream._waiter.set_result(None)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!