Best Python code snippet using playwright-python
test_conn.py
Source:test_conn.py
1"""Tests for TCP connection handling, including proper and timely close."""2from __future__ import absolute_import, division, print_function3__metaclass__ = type4import errno5import socket6import time7import logging8import traceback as traceback_9from collections import namedtuple10from six.moves import range, http_client, urllib11import six12import pytest13from jaraco.text import trim, unwrap14from cheroot.test import helper, webtest15from cheroot._compat import IS_CI, IS_MACOS, IS_PYPY, IS_WINDOWS16import cheroot.server17IS_SLOW_ENV = IS_MACOS or IS_WINDOWS18timeout = 119pov = 'pPeErRsSiIsStTeEnNcCeE oOfF vViIsSiIoOnN'20class Controller(helper.Controller):21 """Controller for serving WSGI apps."""22 def hello(req, resp):23 """Render Hello world."""24 return 'Hello, world!'25 def pov(req, resp):26 """Render ``pov`` value."""27 return pov28 def stream(req, resp):29 """Render streaming response."""30 if 'set_cl' in req.environ['QUERY_STRING']:31 resp.headers['Content-Length'] = str(10)32 def content():33 for x in range(10):34 yield str(x)35 return content()36 def upload(req, resp):37 """Process file upload and render thank."""38 if not req.environ['REQUEST_METHOD'] == 'POST':39 raise AssertionError(40 "'POST' != request.method %r" %41 req.environ['REQUEST_METHOD'],42 )43 return "thanks for '%s'" % req.environ['wsgi.input'].read()44 def custom_204(req, resp):45 """Render response with status 204."""46 resp.status = '204'47 return 'Code = 204'48 def custom_304(req, resp):49 """Render response with status 304."""50 resp.status = '304'51 return 'Code = 304'52 def err_before_read(req, resp):53 """Render response with status 500."""54 resp.status = '500 Internal Server Error'55 return 'ok'56 def one_megabyte_of_a(req, resp):57 """Render 1MB response."""58 return ['a' * 1024] * 102459 def wrong_cl_buffered(req, resp):60 """Render buffered response with invalid length value."""61 resp.headers['Content-Length'] = '5'62 return 'I have too many bytes'63 def wrong_cl_unbuffered(req, resp):64 """Render unbuffered response with invalid length value."""65 resp.headers['Content-Length'] = '5'66 return ['I too', ' have too many bytes']67 def _munge(string):68 """Encode PATH_INFO correctly depending on Python version.69 WSGI 1.0 is a mess around unicode. Create endpoints70 that match the PATH_INFO that it produces.71 """72 if six.PY2:73 return string74 return string.encode('utf-8').decode('latin-1')75 handlers = {76 '/hello': hello,77 '/pov': pov,78 '/page1': pov,79 '/page2': pov,80 '/page3': pov,81 '/stream': stream,82 '/upload': upload,83 '/custom/204': custom_204,84 '/custom/304': custom_304,85 '/err_before_read': err_before_read,86 '/one_megabyte_of_a': one_megabyte_of_a,87 '/wrong_cl_buffered': wrong_cl_buffered,88 '/wrong_cl_unbuffered': wrong_cl_unbuffered,89 }90class ErrorLogMonitor:91 """Mock class to access the server error_log calls made by the server."""92 ErrorLogCall = namedtuple('ErrorLogCall', ['msg', 'level', 'traceback'])93 def __init__(self):94 """Initialize the server error log monitor/interceptor.95 If you need to ignore a particular error message use the property96 ``ignored_msgs`` by appending to the list the expected error messages.97 """98 self.calls = []99 # to be used the the teardown validation100 self.ignored_msgs = []101 def __call__(self, msg='', level=logging.INFO, traceback=False):102 """Intercept the call to the server error_log method."""103 if traceback:104 tblines = traceback_.format_exc()105 else:106 tblines = ''107 self.calls.append(ErrorLogMonitor.ErrorLogCall(msg, level, tblines))108@pytest.fixture109def raw_testing_server(wsgi_server_client):110 """Attach a WSGI app to the given server and preconfigure it."""111 app = Controller()112 def _timeout(req, resp):113 return str(wsgi_server.timeout)114 app.handlers['/timeout'] = _timeout115 wsgi_server = wsgi_server_client.server_instance116 wsgi_server.wsgi_app = app117 wsgi_server.max_request_body_size = 1001118 wsgi_server.timeout = timeout119 wsgi_server.server_client = wsgi_server_client120 wsgi_server.keep_alive_conn_limit = 2121 return wsgi_server122@pytest.fixture123def testing_server(raw_testing_server, monkeypatch):124 """Modify the "raw" base server to monitor the error_log messages.125 If you need to ignore a particular error message use the property126 ``testing_server.error_log.ignored_msgs`` by appending to the list127 the expected error messages.128 """129 # patch the error_log calls of the server instance130 monkeypatch.setattr(raw_testing_server, 'error_log', ErrorLogMonitor())131 yield raw_testing_server132 # Teardown verification, in case that the server logged an133 # error that wasn't notified to the client or we just made a mistake.134 # pylint: disable=possibly-unused-variable135 for c_msg, c_level, c_traceback in raw_testing_server.error_log.calls:136 if c_level <= logging.WARNING:137 continue138 assert c_msg in raw_testing_server.error_log.ignored_msgs, (139 'Found error in the error log: '140 "message = '{c_msg}', level = '{c_level}'\n"141 '{c_traceback}'.format(**locals()),142 )143@pytest.fixture144def test_client(testing_server):145 """Get and return a test client out of the given server."""146 return testing_server.server_client147def header_exists(header_name, headers):148 """Check that a header is present."""149 return header_name.lower() in (k.lower() for (k, _) in headers)150def header_has_value(header_name, header_value, headers):151 """Check that a header with a given value is present."""152 return header_name.lower() in (153 k.lower() for (k, v) in headers154 if v == header_value155 )156def test_HTTP11_persistent_connections(test_client):157 """Test persistent HTTP/1.1 connections."""158 # Initialize a persistent HTTP connection159 http_connection = test_client.get_connection()160 http_connection.auto_open = False161 http_connection.connect()162 # Make the first request and assert there's no "Connection: close".163 status_line, actual_headers, actual_resp_body = test_client.get(164 '/pov', http_conn=http_connection,165 )166 actual_status = int(status_line[:3])167 assert actual_status == 200168 assert status_line[4:] == 'OK'169 assert actual_resp_body == pov.encode()170 assert not header_exists('Connection', actual_headers)171 # Make another request on the same connection.172 status_line, actual_headers, actual_resp_body = test_client.get(173 '/page1', http_conn=http_connection,174 )175 actual_status = int(status_line[:3])176 assert actual_status == 200177 assert status_line[4:] == 'OK'178 assert actual_resp_body == pov.encode()179 assert not header_exists('Connection', actual_headers)180 # Test client-side close.181 status_line, actual_headers, actual_resp_body = test_client.get(182 '/page2', http_conn=http_connection,183 headers=[('Connection', 'close')],184 )185 actual_status = int(status_line[:3])186 assert actual_status == 200187 assert status_line[4:] == 'OK'188 assert actual_resp_body == pov.encode()189 assert header_has_value('Connection', 'close', actual_headers)190 # Make another request on the same connection, which should error.191 with pytest.raises(http_client.NotConnected):192 test_client.get('/pov', http_conn=http_connection)193@pytest.mark.parametrize(194 'set_cl',195 (196 False, # Without Content-Length197 True, # With Content-Length198 ),199)200def test_streaming_11(test_client, set_cl):201 """Test serving of streaming responses with HTTP/1.1 protocol."""202 # Initialize a persistent HTTP connection203 http_connection = test_client.get_connection()204 http_connection.auto_open = False205 http_connection.connect()206 # Make the first request and assert there's no "Connection: close".207 status_line, actual_headers, actual_resp_body = test_client.get(208 '/pov', http_conn=http_connection,209 )210 actual_status = int(status_line[:3])211 assert actual_status == 200212 assert status_line[4:] == 'OK'213 assert actual_resp_body == pov.encode()214 assert not header_exists('Connection', actual_headers)215 # Make another, streamed request on the same connection.216 if set_cl:217 # When a Content-Length is provided, the content should stream218 # without closing the connection.219 status_line, actual_headers, actual_resp_body = test_client.get(220 '/stream?set_cl=Yes', http_conn=http_connection,221 )222 assert header_exists('Content-Length', actual_headers)223 assert not header_has_value('Connection', 'close', actual_headers)224 assert not header_exists('Transfer-Encoding', actual_headers)225 assert actual_status == 200226 assert status_line[4:] == 'OK'227 assert actual_resp_body == b'0123456789'228 else:229 # When no Content-Length response header is provided,230 # streamed output will either close the connection, or use231 # chunked encoding, to determine transfer-length.232 status_line, actual_headers, actual_resp_body = test_client.get(233 '/stream', http_conn=http_connection,234 )235 assert not header_exists('Content-Length', actual_headers)236 assert actual_status == 200237 assert status_line[4:] == 'OK'238 assert actual_resp_body == b'0123456789'239 chunked_response = False240 for k, v in actual_headers:241 if k.lower() == 'transfer-encoding':242 if str(v) == 'chunked':243 chunked_response = True244 if chunked_response:245 assert not header_has_value('Connection', 'close', actual_headers)246 else:247 assert header_has_value('Connection', 'close', actual_headers)248 # Make another request on the same connection, which should249 # error.250 with pytest.raises(http_client.NotConnected):251 test_client.get('/pov', http_conn=http_connection)252 # Try HEAD.253 # See https://www.bitbucket.org/cherrypy/cherrypy/issue/864.254 # TODO: figure out how can this be possible on an closed connection255 # (chunked_response case)256 status_line, actual_headers, actual_resp_body = test_client.head(257 '/stream', http_conn=http_connection,258 )259 assert actual_status == 200260 assert status_line[4:] == 'OK'261 assert actual_resp_body == b''262 assert not header_exists('Transfer-Encoding', actual_headers)263@pytest.mark.parametrize(264 'set_cl',265 (266 False, # Without Content-Length267 True, # With Content-Length268 ),269)270def test_streaming_10(test_client, set_cl):271 """Test serving of streaming responses with HTTP/1.0 protocol."""272 original_server_protocol = test_client.server_instance.protocol273 test_client.server_instance.protocol = 'HTTP/1.0'274 # Initialize a persistent HTTP connection275 http_connection = test_client.get_connection()276 http_connection.auto_open = False277 http_connection.connect()278 # Make the first request and assert Keep-Alive.279 status_line, actual_headers, actual_resp_body = test_client.get(280 '/pov', http_conn=http_connection,281 headers=[('Connection', 'Keep-Alive')],282 protocol='HTTP/1.0',283 )284 actual_status = int(status_line[:3])285 assert actual_status == 200286 assert status_line[4:] == 'OK'287 assert actual_resp_body == pov.encode()288 assert header_has_value('Connection', 'Keep-Alive', actual_headers)289 # Make another, streamed request on the same connection.290 if set_cl:291 # When a Content-Length is provided, the content should292 # stream without closing the connection.293 status_line, actual_headers, actual_resp_body = test_client.get(294 '/stream?set_cl=Yes', http_conn=http_connection,295 headers=[('Connection', 'Keep-Alive')],296 protocol='HTTP/1.0',297 )298 actual_status = int(status_line[:3])299 assert actual_status == 200300 assert status_line[4:] == 'OK'301 assert actual_resp_body == b'0123456789'302 assert header_exists('Content-Length', actual_headers)303 assert header_has_value('Connection', 'Keep-Alive', actual_headers)304 assert not header_exists('Transfer-Encoding', actual_headers)305 else:306 # When a Content-Length is not provided,307 # the server should close the connection.308 status_line, actual_headers, actual_resp_body = test_client.get(309 '/stream', http_conn=http_connection,310 headers=[('Connection', 'Keep-Alive')],311 protocol='HTTP/1.0',312 )313 actual_status = int(status_line[:3])314 assert actual_status == 200315 assert status_line[4:] == 'OK'316 assert actual_resp_body == b'0123456789'317 assert not header_exists('Content-Length', actual_headers)318 assert not header_has_value('Connection', 'Keep-Alive', actual_headers)319 assert not header_exists('Transfer-Encoding', actual_headers)320 # Make another request on the same connection, which should error.321 with pytest.raises(http_client.NotConnected):322 test_client.get(323 '/pov', http_conn=http_connection,324 protocol='HTTP/1.0',325 )326 test_client.server_instance.protocol = original_server_protocol327@pytest.mark.parametrize(328 'http_server_protocol',329 (330 'HTTP/1.0',331 pytest.param(332 'HTTP/1.1',333 marks=pytest.mark.xfail(334 IS_PYPY and IS_CI,335 reason='Fails under PyPy in CI for unknown reason',336 strict=False,337 ),338 ),339 ),340)341def test_keepalive(test_client, http_server_protocol):342 """Test Keep-Alive enabled connections."""343 original_server_protocol = test_client.server_instance.protocol344 test_client.server_instance.protocol = http_server_protocol345 http_client_protocol = 'HTTP/1.0'346 # Initialize a persistent HTTP connection347 http_connection = test_client.get_connection()348 http_connection.auto_open = False349 http_connection.connect()350 # Test a normal HTTP/1.0 request.351 status_line, actual_headers, actual_resp_body = test_client.get(352 '/page2',353 protocol=http_client_protocol,354 )355 actual_status = int(status_line[:3])356 assert actual_status == 200357 assert status_line[4:] == 'OK'358 assert actual_resp_body == pov.encode()359 assert not header_exists('Connection', actual_headers)360 # Test a keep-alive HTTP/1.0 request.361 status_line, actual_headers, actual_resp_body = test_client.get(362 '/page3', headers=[('Connection', 'Keep-Alive')],363 http_conn=http_connection, protocol=http_client_protocol,364 )365 actual_status = int(status_line[:3])366 assert actual_status == 200367 assert status_line[4:] == 'OK'368 assert actual_resp_body == pov.encode()369 assert header_has_value('Connection', 'Keep-Alive', actual_headers)370 assert header_has_value(371 'Keep-Alive',372 'timeout={test_client.server_instance.timeout}'.format(**locals()),373 actual_headers,374 )375 # Remove the keep-alive header again.376 status_line, actual_headers, actual_resp_body = test_client.get(377 '/page3', http_conn=http_connection,378 protocol=http_client_protocol,379 )380 actual_status = int(status_line[:3])381 assert actual_status == 200382 assert status_line[4:] == 'OK'383 assert actual_resp_body == pov.encode()384 assert not header_exists('Connection', actual_headers)385 assert not header_exists('Keep-Alive', actual_headers)386 test_client.server_instance.protocol = original_server_protocol387def test_keepalive_conn_management(test_client):388 """Test management of Keep-Alive connections."""389 test_client.server_instance.timeout = 2390 def connection():391 # Initialize a persistent HTTP connection392 http_connection = test_client.get_connection()393 http_connection.auto_open = False394 http_connection.connect()395 return http_connection396 def request(conn, keepalive=True):397 status_line, actual_headers, actual_resp_body = test_client.get(398 '/page3', headers=[('Connection', 'Keep-Alive')],399 http_conn=conn, protocol='HTTP/1.0',400 )401 actual_status = int(status_line[:3])402 assert actual_status == 200403 assert status_line[4:] == 'OK'404 assert actual_resp_body == pov.encode()405 if keepalive:406 assert header_has_value('Connection', 'Keep-Alive', actual_headers)407 assert header_has_value(408 'Keep-Alive',409 'timeout={test_client.server_instance.timeout}'.410 format(**locals()),411 actual_headers,412 )413 else:414 assert not header_exists('Connection', actual_headers)415 assert not header_exists('Keep-Alive', actual_headers)416 def check_server_idle_conn_count(count, timeout=1.0):417 deadline = time.time() + timeout418 while True:419 n = test_client.server_instance._connections._num_connections420 if n == count:421 return422 assert time.time() <= deadline, (423 'idle conn count mismatch, wanted {count}, got {n}'.424 format(**locals()),425 )426 disconnect_errors = (427 http_client.BadStatusLine,428 http_client.CannotSendRequest,429 http_client.NotConnected,430 )431 # Make a new connection.432 c1 = connection()433 request(c1)434 check_server_idle_conn_count(1)435 # Make a second one.436 c2 = connection()437 request(c2)438 check_server_idle_conn_count(2)439 # Reusing the first connection should still work.440 request(c1)441 check_server_idle_conn_count(2)442 # Creating a new connection should still work, but we should443 # have run out of available connections to keep alive, so the444 # server should tell us to close.445 c3 = connection()446 request(c3, keepalive=False)447 check_server_idle_conn_count(2)448 # Show that the third connection was closed.449 with pytest.raises(disconnect_errors):450 request(c3)451 check_server_idle_conn_count(2)452 # Wait for some of our timeout.453 time.sleep(1.2)454 # Refresh the second connection.455 request(c2)456 check_server_idle_conn_count(2)457 # Wait for the remainder of our timeout, plus one tick.458 time.sleep(1.2)459 check_server_idle_conn_count(1)460 # First connection should now be expired.461 with pytest.raises(disconnect_errors):462 request(c1)463 check_server_idle_conn_count(1)464 # But the second one should still be valid.465 request(c2)466 check_server_idle_conn_count(1)467 # Restore original timeout.468 test_client.server_instance.timeout = timeout469@pytest.mark.parametrize(470 ('simulated_exception', 'error_number', 'exception_leaks'),471 (472 pytest.param(473 socket.error, errno.ECONNRESET, False,474 id='socket.error(ECONNRESET)',475 ),476 pytest.param(477 socket.error, errno.EPIPE, False,478 id='socket.error(EPIPE)',479 ),480 pytest.param(481 socket.error, errno.ENOTCONN, False,482 id='simulated socket.error(ENOTCONN)',483 ),484 pytest.param(485 None, # <-- don't raise an artificial exception486 errno.ENOTCONN, False,487 id='real socket.error(ENOTCONN)',488 marks=pytest.mark.xfail(489 IS_WINDOWS,490 reason='Now reproducible this way on Windows',491 ),492 ),493 pytest.param(494 socket.error, errno.ESHUTDOWN, False,495 id='socket.error(ESHUTDOWN)',496 ),497 pytest.param(RuntimeError, 666, True, id='RuntimeError(666)'),498 pytest.param(socket.error, -1, True, id='socket.error(-1)'),499 ) + (500 () if six.PY2 else (501 pytest.param(502 ConnectionResetError, errno.ECONNRESET, False,503 id='ConnectionResetError(ECONNRESET)',504 ),505 pytest.param(506 BrokenPipeError, errno.EPIPE, False,507 id='BrokenPipeError(EPIPE)',508 ),509 pytest.param(510 BrokenPipeError, errno.ESHUTDOWN, False,511 id='BrokenPipeError(ESHUTDOWN)',512 ),513 )514 ),515)516def test_broken_connection_during_tcp_fin(517 error_number, exception_leaks,518 mocker, monkeypatch,519 simulated_exception, test_client,520):521 """Test there's no traceback on broken connection during close.522 It artificially causes :py:data:`~errno.ECONNRESET` /523 :py:data:`~errno.EPIPE` / :py:data:`~errno.ESHUTDOWN` /524 :py:data:`~errno.ENOTCONN` as well as unrelated :py:exc:`RuntimeError`525 and :py:exc:`socket.error(-1) <socket.error>` on the server socket when526 :py:meth:`socket.shutdown() <socket.socket.shutdown>` is called. It's527 triggered by closing the client socket before the server had a chance528 to respond.529 The expectation is that only :py:exc:`RuntimeError` and a530 :py:exc:`socket.error` with an unusual error code would leak.531 With the :py:data:`None`-parameter, a real non-simulated532 :py:exc:`OSError(107, 'Transport endpoint is not connected')533 <OSError>` happens.534 """535 exc_instance = (536 None if simulated_exception is None537 else simulated_exception(error_number, 'Simulated socket error')538 )539 old_close_kernel_socket = (540 test_client.server_instance.541 ConnectionClass._close_kernel_socket542 )543 def _close_kernel_socket(self):544 monkeypatch.setattr( # `socket.shutdown` is read-only otherwise545 self, 'socket',546 mocker.mock_module.Mock(wraps=self.socket),547 )548 if exc_instance is not None:549 monkeypatch.setattr(550 self.socket, 'shutdown',551 mocker.mock_module.Mock(side_effect=exc_instance),552 )553 _close_kernel_socket.fin_spy = mocker.spy(self.socket, 'shutdown')554 try:555 old_close_kernel_socket(self)556 except simulated_exception:557 _close_kernel_socket.exception_leaked = True558 else:559 _close_kernel_socket.exception_leaked = False560 monkeypatch.setattr(561 test_client.server_instance.ConnectionClass,562 '_close_kernel_socket',563 _close_kernel_socket,564 )565 conn = test_client.get_connection()566 conn.auto_open = False567 conn.connect()568 conn.send(b'GET /hello HTTP/1.1')569 conn.send(('Host: %s' % conn.host).encode('ascii'))570 conn.close()571 # Let the server attempt TCP shutdown:572 for _ in range(10 * (2 if IS_SLOW_ENV else 1)):573 time.sleep(0.1)574 if hasattr(_close_kernel_socket, 'exception_leaked'):575 break576 if exc_instance is not None: # simulated by us577 assert _close_kernel_socket.fin_spy.spy_exception is exc_instance578 else: # real579 assert isinstance(580 _close_kernel_socket.fin_spy.spy_exception, socket.error,581 )582 assert _close_kernel_socket.fin_spy.spy_exception.errno == error_number583 assert _close_kernel_socket.exception_leaked is exception_leaks584@pytest.mark.parametrize(585 'timeout_before_headers',586 (587 True,588 False,589 ),590)591def test_HTTP11_Timeout(test_client, timeout_before_headers):592 """Check timeout without sending any data.593 The server will close the connection with a 408.594 """595 conn = test_client.get_connection()596 conn.auto_open = False597 conn.connect()598 if not timeout_before_headers:599 # Connect but send half the headers only.600 conn.send(b'GET /hello HTTP/1.1')601 conn.send(('Host: %s' % conn.host).encode('ascii'))602 # else: Connect but send nothing.603 # Wait for our socket timeout604 time.sleep(timeout * 2)605 # The request should have returned 408 already.606 response = conn.response_class(conn.sock, method='GET')607 response.begin()608 assert response.status == 408609 conn.close()610def test_HTTP11_Timeout_after_request(test_client):611 """Check timeout after at least one request has succeeded.612 The server should close the connection without 408.613 """614 fail_msg = "Writing to timed out socket didn't fail as it should have: %s"615 # Make an initial request616 conn = test_client.get_connection()617 conn.putrequest('GET', '/timeout?t=%s' % timeout, skip_host=True)618 conn.putheader('Host', conn.host)619 conn.endheaders()620 response = conn.response_class(conn.sock, method='GET')621 response.begin()622 assert response.status == 200623 actual_body = response.read()624 expected_body = str(timeout).encode()625 assert actual_body == expected_body626 # Make a second request on the same socket627 conn._output(b'GET /hello HTTP/1.1')628 conn._output(('Host: %s' % conn.host).encode('ascii'))629 conn._send_output()630 response = conn.response_class(conn.sock, method='GET')631 response.begin()632 assert response.status == 200633 actual_body = response.read()634 expected_body = b'Hello, world!'635 assert actual_body == expected_body636 # Wait for our socket timeout637 time.sleep(timeout * 2)638 # Make another request on the same socket, which should error639 conn._output(b'GET /hello HTTP/1.1')640 conn._output(('Host: %s' % conn.host).encode('ascii'))641 conn._send_output()642 response = conn.response_class(conn.sock, method='GET')643 try:644 response.begin()645 except (socket.error, http_client.BadStatusLine):646 pass647 except Exception as ex:648 pytest.fail(fail_msg % ex)649 else:650 if response.status != 408:651 pytest.fail(fail_msg % response.read())652 conn.close()653 # Make another request on a new socket, which should work654 conn = test_client.get_connection()655 conn.putrequest('GET', '/pov', skip_host=True)656 conn.putheader('Host', conn.host)657 conn.endheaders()658 response = conn.response_class(conn.sock, method='GET')659 response.begin()660 assert response.status == 200661 actual_body = response.read()662 expected_body = pov.encode()663 assert actual_body == expected_body664 # Make another request on the same socket,665 # but timeout on the headers666 conn.send(b'GET /hello HTTP/1.1')667 # Wait for our socket timeout668 time.sleep(timeout * 2)669 response = conn.response_class(conn.sock, method='GET')670 try:671 response.begin()672 except (socket.error, http_client.BadStatusLine):673 pass674 except Exception as ex:675 pytest.fail(fail_msg % ex)676 else:677 if response.status != 408:678 pytest.fail(fail_msg % response.read())679 conn.close()680 # Retry the request on a new connection, which should work681 conn = test_client.get_connection()682 conn.putrequest('GET', '/pov', skip_host=True)683 conn.putheader('Host', conn.host)684 conn.endheaders()685 response = conn.response_class(conn.sock, method='GET')686 response.begin()687 assert response.status == 200688 actual_body = response.read()689 expected_body = pov.encode()690 assert actual_body == expected_body691 conn.close()692def test_HTTP11_pipelining(test_client):693 """Test HTTP/1.1 pipelining.694 :py:mod:`http.client` doesn't support this directly.695 """696 conn = test_client.get_connection()697 # Put request 1698 conn.putrequest('GET', '/hello', skip_host=True)699 conn.putheader('Host', conn.host)700 conn.endheaders()701 for trial in range(5):702 # Put next request703 conn._output(704 ('GET /hello?%s HTTP/1.1' % trial).encode('iso-8859-1'),705 )706 conn._output(('Host: %s' % conn.host).encode('ascii'))707 conn._send_output()708 # Retrieve previous response709 response = conn.response_class(conn.sock, method='GET')710 # there is a bug in python3 regarding the buffering of711 # ``conn.sock``. Until that bug get's fixed we will712 # monkey patch the ``response`` instance.713 # https://bugs.python.org/issue23377714 if not six.PY2:715 response.fp = conn.sock.makefile('rb', 0)716 response.begin()717 body = response.read(13)718 assert response.status == 200719 assert body == b'Hello, world!'720 # Retrieve final response721 response = conn.response_class(conn.sock, method='GET')722 response.begin()723 body = response.read()724 assert response.status == 200725 assert body == b'Hello, world!'726 conn.close()727def test_100_Continue(test_client):728 """Test 100-continue header processing."""729 conn = test_client.get_connection()730 # Try a page without an Expect request header first.731 # Note that http.client's response.begin automatically ignores732 # 100 Continue responses, so we must manually check for it.733 conn.putrequest('POST', '/upload', skip_host=True)734 conn.putheader('Host', conn.host)735 conn.putheader('Content-Type', 'text/plain')736 conn.putheader('Content-Length', '4')737 conn.endheaders()738 conn.send(b"d'oh")739 response = conn.response_class(conn.sock, method='POST')740 _version, status, _reason = response._read_status()741 assert status != 100742 conn.close()743 # Now try a page with an Expect header...744 conn.connect()745 conn.putrequest('POST', '/upload', skip_host=True)746 conn.putheader('Host', conn.host)747 conn.putheader('Content-Type', 'text/plain')748 conn.putheader('Content-Length', '17')749 conn.putheader('Expect', '100-continue')750 conn.endheaders()751 response = conn.response_class(conn.sock, method='POST')752 # ...assert and then skip the 100 response753 version, status, reason = response._read_status()754 assert status == 100755 while True:756 line = response.fp.readline().strip()757 if line:758 pytest.fail(759 '100 Continue should not output any headers. Got %r' %760 line,761 )762 else:763 break764 # ...send the body765 body = b'I am a small file'766 conn.send(body)767 # ...get the final response768 response.begin()769 status_line, _actual_headers, actual_resp_body = webtest.shb(response)770 actual_status = int(status_line[:3])771 assert actual_status == 200772 expected_resp_body = ("thanks for '%s'" % body).encode()773 assert actual_resp_body == expected_resp_body774 conn.close()775@pytest.mark.parametrize(776 'max_request_body_size',777 (778 0,779 1001,780 ),781)782def test_readall_or_close(test_client, max_request_body_size):783 """Test a max_request_body_size of 0 (the default) and 1001."""784 old_max = test_client.server_instance.max_request_body_size785 test_client.server_instance.max_request_body_size = max_request_body_size786 conn = test_client.get_connection()787 # Get a POST page with an error788 conn.putrequest('POST', '/err_before_read', skip_host=True)789 conn.putheader('Host', conn.host)790 conn.putheader('Content-Type', 'text/plain')791 conn.putheader('Content-Length', '1000')792 conn.putheader('Expect', '100-continue')793 conn.endheaders()794 response = conn.response_class(conn.sock, method='POST')795 # ...assert and then skip the 100 response796 _version, status, _reason = response._read_status()797 assert status == 100798 skip = True799 while skip:800 skip = response.fp.readline().strip()801 # ...send the body802 conn.send(b'x' * 1000)803 # ...get the final response804 response.begin()805 status_line, _actual_headers, actual_resp_body = webtest.shb(response)806 actual_status = int(status_line[:3])807 assert actual_status == 500808 # Now try a working page with an Expect header...809 conn._output(b'POST /upload HTTP/1.1')810 conn._output(('Host: %s' % conn.host).encode('ascii'))811 conn._output(b'Content-Type: text/plain')812 conn._output(b'Content-Length: 17')813 conn._output(b'Expect: 100-continue')814 conn._send_output()815 response = conn.response_class(conn.sock, method='POST')816 # ...assert and then skip the 100 response817 version, status, reason = response._read_status()818 assert status == 100819 skip = True820 while skip:821 skip = response.fp.readline().strip()822 # ...send the body823 body = b'I am a small file'824 conn.send(body)825 # ...get the final response826 response.begin()827 status_line, actual_headers, actual_resp_body = webtest.shb(response)828 actual_status = int(status_line[:3])829 assert actual_status == 200830 expected_resp_body = ("thanks for '%s'" % body).encode()831 assert actual_resp_body == expected_resp_body832 conn.close()833 test_client.server_instance.max_request_body_size = old_max834def test_No_Message_Body(test_client):835 """Test HTTP queries with an empty response body."""836 # Initialize a persistent HTTP connection837 http_connection = test_client.get_connection()838 http_connection.auto_open = False839 http_connection.connect()840 # Make the first request and assert there's no "Connection: close".841 status_line, actual_headers, actual_resp_body = test_client.get(842 '/pov', http_conn=http_connection,843 )844 actual_status = int(status_line[:3])845 assert actual_status == 200846 assert status_line[4:] == 'OK'847 assert actual_resp_body == pov.encode()848 assert not header_exists('Connection', actual_headers)849 # Make a 204 request on the same connection.850 status_line, actual_headers, actual_resp_body = test_client.get(851 '/custom/204', http_conn=http_connection,852 )853 actual_status = int(status_line[:3])854 assert actual_status == 204855 assert not header_exists('Content-Length', actual_headers)856 assert actual_resp_body == b''857 assert not header_exists('Connection', actual_headers)858 # Make a 304 request on the same connection.859 status_line, actual_headers, actual_resp_body = test_client.get(860 '/custom/304', http_conn=http_connection,861 )862 actual_status = int(status_line[:3])863 assert actual_status == 304864 assert not header_exists('Content-Length', actual_headers)865 assert actual_resp_body == b''866 assert not header_exists('Connection', actual_headers)867@pytest.mark.xfail(868 reason=unwrap(869 trim("""870 Headers from earlier request leak into the request871 line for a subsequent request, resulting in 400872 instead of 413. See cherrypy/cheroot#69 for details.873 """),874 ),875)876def test_Chunked_Encoding(test_client):877 """Test HTTP uploads with chunked transfer-encoding."""878 # Initialize a persistent HTTP connection879 conn = test_client.get_connection()880 # Try a normal chunked request (with extensions)881 body = (882 b'8;key=value\r\nxx\r\nxxxx\r\n5\r\nyyyyy\r\n0\r\n'883 b'Content-Type: application/json\r\n'884 b'\r\n'885 )886 conn.putrequest('POST', '/upload', skip_host=True)887 conn.putheader('Host', conn.host)888 conn.putheader('Transfer-Encoding', 'chunked')889 conn.putheader('Trailer', 'Content-Type')890 # Note that this is somewhat malformed:891 # we shouldn't be sending Content-Length.892 # RFC 2616 says the server should ignore it.893 conn.putheader('Content-Length', '3')894 conn.endheaders()895 conn.send(body)896 response = conn.getresponse()897 status_line, _actual_headers, actual_resp_body = webtest.shb(response)898 actual_status = int(status_line[:3])899 assert actual_status == 200900 assert status_line[4:] == 'OK'901 expected_resp_body = ("thanks for '%s'" % b'xx\r\nxxxxyyyyy').encode()902 assert actual_resp_body == expected_resp_body903 # Try a chunked request that exceeds server.max_request_body_size.904 # Note that the delimiters and trailer are included.905 body = b'\r\n'.join((b'3e3', b'x' * 995, b'0', b'', b''))906 conn.putrequest('POST', '/upload', skip_host=True)907 conn.putheader('Host', conn.host)908 conn.putheader('Transfer-Encoding', 'chunked')909 conn.putheader('Content-Type', 'text/plain')910 # Chunked requests don't need a content-length911 # conn.putheader("Content-Length", len(body))912 conn.endheaders()913 conn.send(body)914 response = conn.getresponse()915 status_line, actual_headers, actual_resp_body = webtest.shb(response)916 actual_status = int(status_line[:3])917 assert actual_status == 413918 conn.close()919def test_Content_Length_in(test_client):920 """Try a non-chunked request where Content-Length exceeds limit.921 (server.max_request_body_size).922 Assert error before body send.923 """924 # Initialize a persistent HTTP connection925 conn = test_client.get_connection()926 conn.putrequest('POST', '/upload', skip_host=True)927 conn.putheader('Host', conn.host)928 conn.putheader('Content-Type', 'text/plain')929 conn.putheader('Content-Length', '9999')930 conn.endheaders()931 response = conn.getresponse()932 status_line, _actual_headers, actual_resp_body = webtest.shb(response)933 actual_status = int(status_line[:3])934 assert actual_status == 413935 expected_resp_body = (936 b'The entity sent with the request exceeds '937 b'the maximum allowed bytes.'938 )939 assert actual_resp_body == expected_resp_body940 conn.close()941def test_Content_Length_not_int(test_client):942 """Test that malicious Content-Length header returns 400."""943 status_line, _actual_headers, actual_resp_body = test_client.post(944 '/upload',945 headers=[946 ('Content-Type', 'text/plain'),947 ('Content-Length', 'not-an-integer'),948 ],949 )950 actual_status = int(status_line[:3])951 assert actual_status == 400952 assert actual_resp_body == b'Malformed Content-Length Header.'953@pytest.mark.parametrize(954 ('uri', 'expected_resp_status', 'expected_resp_body'),955 (956 (957 '/wrong_cl_buffered', 500,958 (959 b'The requested resource returned more bytes than the '960 b'declared Content-Length.'961 ),962 ),963 ('/wrong_cl_unbuffered', 200, b'I too'),964 ),965)966def test_Content_Length_out(967 test_client,968 uri, expected_resp_status, expected_resp_body,969):970 """Test response with Content-Length less than the response body.971 (non-chunked response)972 """973 conn = test_client.get_connection()974 conn.putrequest('GET', uri, skip_host=True)975 conn.putheader('Host', conn.host)976 conn.endheaders()977 response = conn.getresponse()978 status_line, _actual_headers, actual_resp_body = webtest.shb(response)979 actual_status = int(status_line[:3])980 assert actual_status == expected_resp_status981 assert actual_resp_body == expected_resp_body982 conn.close()983 # the server logs the exception that we had verified from the984 # client perspective. Tell the error_log verification that985 # it can ignore that message.986 test_client.server_instance.error_log.ignored_msgs.extend((987 # Python 3.7+:988 "ValueError('Response body exceeds the declared Content-Length.')",989 # Python 2.7-3.6 (macOS?):990 "ValueError('Response body exceeds the declared Content-Length.',)",991 ))992@pytest.mark.xfail(993 reason='Sometimes this test fails due to low timeout. '994 'Ref: https://github.com/cherrypy/cherrypy/issues/598',995)996def test_598(test_client):997 """Test serving large file with a read timeout in place."""998 # Initialize a persistent HTTP connection999 conn = test_client.get_connection()1000 remote_data_conn = urllib.request.urlopen(1001 '%s://%s:%s/one_megabyte_of_a'1002 % ('http', conn.host, conn.port),1003 )1004 buf = remote_data_conn.read(512)1005 time.sleep(timeout * 0.6)1006 remaining = (1024 * 1024) - 5121007 while remaining:1008 data = remote_data_conn.read(remaining)1009 if not data:1010 break1011 buf += data1012 remaining -= len(data)1013 assert len(buf) == 1024 * 10241014 assert buf == b'a' * 1024 * 10241015 assert remaining == 01016 remote_data_conn.close()1017@pytest.mark.parametrize(1018 'invalid_terminator',1019 (1020 b'\n\n',1021 b'\r\n\n',1022 ),1023)1024def test_No_CRLF(test_client, invalid_terminator):1025 """Test HTTP queries with no valid CRLF terminators."""1026 # Initialize a persistent HTTP connection1027 conn = test_client.get_connection()1028 # (b'%s' % b'') is not supported in Python 3.4, so just use bytes.join()1029 conn.send(b''.join((b'GET /hello HTTP/1.1', invalid_terminator)))1030 response = conn.response_class(conn.sock, method='GET')1031 response.begin()1032 actual_resp_body = response.read()1033 expected_resp_body = b'HTTP requires CRLF terminators'1034 assert actual_resp_body == expected_resp_body1035 conn.close()1036class FaultySelect:1037 """Mock class to insert errors in the selector.select method."""1038 def __init__(self, original_select):1039 """Initilize helper class to wrap the selector.select method."""1040 self.original_select = original_select1041 self.request_served = False1042 self.os_error_triggered = False1043 def __call__(self, timeout):1044 """Intercept the calls to selector.select."""1045 if self.request_served:1046 self.os_error_triggered = True1047 raise OSError('Error while selecting the client socket.')1048 return self.original_select(timeout)1049class FaultyGetMap:1050 """Mock class to insert errors in the selector.get_map method."""1051 def __init__(self, original_get_map):1052 """Initilize helper class to wrap the selector.get_map method."""1053 self.original_get_map = original_get_map1054 self.sabotage_conn = False1055 self.conn_closed = False1056 def __call__(self):1057 """Intercept the calls to selector.get_map."""1058 sabotage_targets = (1059 conn for _, (_, _, _, conn) in self.original_get_map().items()1060 if isinstance(conn, cheroot.server.HTTPConnection)1061 ) if self.sabotage_conn and not self.conn_closed else ()1062 for conn in sabotage_targets:1063 # close the socket to cause OSError1064 conn.close()1065 self.conn_closed = True1066 return self.original_get_map()1067def test_invalid_selected_connection(test_client, monkeypatch):1068 """Test the error handling segment of HTTP connection selection.1069 See :py:meth:`cheroot.connections.ConnectionManager.get_conn`.1070 """1071 # patch the select method1072 faux_select = FaultySelect(1073 test_client.server_instance._connections._selector.select,1074 )1075 monkeypatch.setattr(1076 test_client.server_instance._connections._selector,1077 'select',1078 faux_select,1079 )1080 # patch the get_map method1081 faux_get_map = FaultyGetMap(1082 test_client.server_instance._connections._selector._selector.get_map,1083 )1084 monkeypatch.setattr(1085 test_client.server_instance._connections._selector._selector,1086 'get_map',1087 faux_get_map,1088 )1089 # request a page with connection keep-alive to make sure1090 # we'll have a connection to be modified.1091 resp_status, _resp_headers, _resp_body = test_client.request(1092 '/page1', headers=[('Connection', 'Keep-Alive')],1093 )1094 assert resp_status == '200 OK'1095 # trigger the internal errors1096 faux_get_map.sabotage_conn = faux_select.request_served = True1097 # give time to make sure the error gets handled1098 time.sleep(test_client.server_instance.expiration_interval * 2)1099 assert faux_select.os_error_triggered...
_network.py
Source:_network.py
...137 @property138 def headers(self) -> Headers:139 return self._provisional_headers.headers()140 async def all_headers(self) -> Headers:141 return (await self._actual_headers()).headers()142 async def headers_array(self) -> HeadersArray:143 return (await self._actual_headers()).headers_array()144 async def header_value(self, name: str) -> Optional[str]:145 return (await self._actual_headers()).get(name)146 async def _actual_headers(self) -> "RawHeaders":147 if not self._all_headers_future:148 self._all_headers_future = asyncio.Future()149 headers = await self._channel.send("rawRequestHeaders")150 self._all_headers_future.set_result(RawHeaders(headers))151 return await self._all_headers_future152class Route(ChannelOwner):153 def __init__(154 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict155 ) -> None:156 super().__init__(parent, type, guid, initializer)157 def __repr__(self) -> str:158 return f"<Route request={self.request}>"159 @property160 def request(self) -> Request:161 return from_channel(self._initializer["request"])162 async def abort(self, errorCode: str = None) -> None:163 await self._race_with_page_close(164 self._channel.send("abort", locals_to_params(locals()))165 )166 async def fulfill(167 self,168 status: int = None,169 headers: Dict[str, str] = None,170 body: Union[str, bytes] = None,171 path: Union[str, Path] = None,172 contentType: str = None,173 response: "APIResponse" = None,174 ) -> None:175 params = locals_to_params(locals())176 if response:177 del params["response"]178 params["status"] = (179 params["status"] if params.get("status") else response.status180 )181 params["headers"] = (182 params["headers"] if params.get("headers") else response.headers183 )184 from playwright._impl._fetch import APIResponse185 if body is None and path is None and isinstance(response, APIResponse):186 if response._request._connection is self._connection:187 params["fetchResponseUid"] = response._fetch_uid188 else:189 body = await response.body()190 length = 0191 if isinstance(body, str):192 params["body"] = body193 params["isBase64"] = False194 length = len(body.encode())195 elif isinstance(body, bytes):196 params["body"] = base64.b64encode(body).decode()197 params["isBase64"] = True198 length = len(body)199 elif path:200 del params["path"]201 file_content = Path(path).read_bytes()202 params["body"] = base64.b64encode(file_content).decode()203 params["isBase64"] = True204 length = len(file_content)205 headers = {k.lower(): str(v) for k, v in params.get("headers", {}).items()}206 if params.get("contentType"):207 headers["content-type"] = params["contentType"]208 elif path:209 headers["content-type"] = (210 mimetypes.guess_type(str(Path(path)))[0] or "application/octet-stream"211 )212 if length and "content-length" not in headers:213 headers["content-length"] = str(length)214 params["headers"] = serialize_headers(headers)215 await self._race_with_page_close(self._channel.send("fulfill", params))216 async def continue_(217 self,218 url: str = None,219 method: str = None,220 headers: Dict[str, str] = None,221 postData: Union[str, bytes] = None,222 ) -> None:223 overrides: ContinueParameters = {}224 if url:225 overrides["url"] = url226 if method:227 overrides["method"] = method228 if headers:229 overrides["headers"] = serialize_headers(headers)230 if isinstance(postData, str):231 overrides["postData"] = base64.b64encode(postData.encode()).decode()232 elif isinstance(postData, bytes):233 overrides["postData"] = base64.b64encode(postData).decode()234 await self._race_with_page_close(235 self._channel.send("continue", cast(Any, overrides))236 )237 def _internal_continue(self) -> None:238 async def continue_route() -> None:239 try:240 await self.continue_()241 except Exception:242 pass243 asyncio.create_task(continue_route())244 async def _race_with_page_close(self, future: Coroutine) -> None:245 if hasattr(self.request.frame, "_page"):246 page = self.request.frame._page247 # When page closes or crashes, we catch any potential rejects from this Route.248 # Note that page could be missing when routing popup's initial request that249 # does not have a Page initialized just yet.250 fut = asyncio.create_task(future)251 await asyncio.wait(252 [fut, page._closed_or_crashed_future],253 return_when=asyncio.FIRST_COMPLETED,254 )255 if page._closed_or_crashed_future.done():256 await asyncio.gather(fut, return_exceptions=True)257 else:258 await future259class Response(ChannelOwner):260 def __init__(261 self, parent: ChannelOwner, type: str, guid: str, initializer: Dict262 ) -> None:263 super().__init__(parent, type, guid, initializer)264 self._request: Request = from_channel(self._initializer["request"])265 timing = self._initializer["timing"]266 self._request._timing["startTime"] = timing["startTime"]267 self._request._timing["domainLookupStart"] = timing["domainLookupStart"]268 self._request._timing["domainLookupEnd"] = timing["domainLookupEnd"]269 self._request._timing["connectStart"] = timing["connectStart"]270 self._request._timing["secureConnectionStart"] = timing["secureConnectionStart"]271 self._request._timing["connectEnd"] = timing["connectEnd"]272 self._request._timing["requestStart"] = timing["requestStart"]273 self._request._timing["responseStart"] = timing["responseStart"]274 self._provisional_headers = RawHeaders(275 cast(HeadersArray, self._initializer["headers"])276 )277 self._raw_headers_future: Optional[asyncio.Future[RawHeaders]] = None278 self._finished_future: asyncio.Future[bool] = asyncio.Future()279 def __repr__(self) -> str:280 return f"<Response url={self.url!r} request={self.request}>"281 @property282 def url(self) -> str:283 return self._initializer["url"]284 @property285 def ok(self) -> bool:286 # Status 0 is for file:// URLs287 return self._initializer["status"] == 0 or (288 self._initializer["status"] >= 200 and self._initializer["status"] <= 299289 )290 @property291 def status(self) -> int:292 return self._initializer["status"]293 @property294 def status_text(self) -> str:295 return self._initializer["statusText"]296 @property297 def headers(self) -> Headers:298 return self._provisional_headers.headers()299 async def all_headers(self) -> Headers:300 return (await self._actual_headers()).headers()301 async def headers_array(self) -> HeadersArray:302 return (await self._actual_headers()).headers_array()303 async def header_value(self, name: str) -> Optional[str]:304 return (await self._actual_headers()).get(name)305 async def header_values(self, name: str) -> List[str]:306 return (await self._actual_headers()).get_all(name)307 async def _actual_headers(self) -> "RawHeaders":308 if not self._raw_headers_future:309 self._raw_headers_future = asyncio.Future()310 headers = cast(HeadersArray, await self._channel.send("rawResponseHeaders"))311 self._raw_headers_future.set_result(RawHeaders(headers))312 return await self._raw_headers_future313 async def server_addr(self) -> Optional[RemoteAddr]:314 return await self._channel.send("serverAddr")315 async def security_details(self) -> Optional[SecurityDetails]:316 return await self._channel.send("securityDetails")317 async def finished(self) -> None:318 await self._finished_future319 async def body(self) -> bytes:320 binary = await self._channel.send("body")321 return base64.b64decode(binary)...
tabular.py
Source:tabular.py
1# This file is part of the GBI project.2# Copyright (C) 2012 Omniscale GmbH & Co. KG <http://omniscale.com>3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Convert GeoJSON-like datastructures to tabular datasets"""16from io import BytesIO17import csv18from geobox.ext.odf import opendocument, style, table, text19class Tabular(object):20 """21 Collection dict-items into a list of rows.22 :param headers: expected headers. columns will be sorted23 in this order.24 """25 def __init__(self, headers=None, additional_headers=False):26 self.headers = headers or []27 self.additional_headers = additional_headers28 self._actual_headers = set()29 self._row_dicts = []30 def add(self, item):31 """32 Add dictionary as a new row.33 """34 self._actual_headers.update(set(item.keys()))35 self._row_dicts.append(item)36 def as_rows(self, with_headers=False):37 """38 Return all rows. The first row contains the39 header, if with_headers is true.40 columns are sorted by Taular.headers first, then any additional41 keys that are found in the added row dicts.42 """43 headers = list(self.headers)44 if self.additional_headers:45 for h in self._actual_headers:46 if h not in headers:47 headers.append(h)48 rows = []49 if with_headers:50 rows.append(headers)51 for row_dict in self._row_dicts:52 row = []53 for h in headers:54 row.append(row_dict.get(h))55 rows.append(row)56 return rows57def geojson_to_rows(doc, headers=None):58 """59 Collect properties of all GeoJSON Features as list of rows.60 :param doc: GeoJSON dictionary with FeatureCollection or Feature61 :param headers: List of expected property keys. Expected keys appear62 as the first columns in the output rows.63 :returns: list of rows, first row contains header names (property keys).64 """65 tabular = Tabular(headers)66 _add_geojson(tabular, doc)67 return tabular.as_rows(with_headers=True)68def _add_geojson(tabular, doc):69 if doc.get('type') == 'Feature':70 tabular.add(doc.get('properties', {}))71 elif doc.get('type') == 'FeatureCollection':72 for feature in doc.get('features', []):73 _add_geojson(tabular, feature)74odf_bold_style = style.Style(name="bold", family="paragraph")75odf_bold_style.addElement(style.TextProperties(fontweight="bold", fontweightasian="bold", fontweightcomplex="bold"))76def ods_export(rows, with_headers=False, name=None):77 """78 Export rows as OpenDocument spreadsheet.79 :params with_headers: If True, output first row as bold text.80 :params name: Name of the worksheet.81 :returns: ODS file content as string82 (use open(x, 'wb') when writing to a file).83 """84 # ODS export with code from tablib85 # (c) 2011 by Kenneth Reitz, MIT licensed86 wb = opendocument.OpenDocumentSpreadsheet()87 wb.automaticstyles.addElement(odf_bold_style)88 ws = table.Table(name=name or 'Export')89 wb.spreadsheet.addElement(ws)90 for i, row in enumerate(rows):91 row_number = i + 192 odf_row = table.TableRow(stylename=odf_bold_style, defaultcellstylename='bold')93 for j, col in enumerate(row):94 try:95 col = unicode(col, errors='ignore')96 except TypeError:97 ## col is already unicode98 pass99 ws.addElement(table.TableColumn())100 # bold headers101 if (row_number == 1) and with_headers:102 odf_row.setAttribute('stylename', odf_bold_style)103 ws.addElement(odf_row)104 cell = table.TableCell()105 p = text.P()106 p.addElement(text.Span(text=col, stylename=odf_bold_style))107 cell.addElement(p)108 odf_row.addElement(cell)109 # wrap the rest110 else:111 ws.addElement(odf_row)112 if isinstance(col, (int, float)):113 cell = table.TableCell(valuetype='float', value=str(col))114 else:115 cell = table.TableCell()116 cell.addElement(text.P(text=col))117 odf_row.addElement(cell)118 stream = BytesIO()119 wb.save(stream)120 return stream.getvalue()121def csv_export(rows):122 """123 Export rows as CSV file.124 :returns: CSV file content as string125 (use open(x, 'wb') when writing to a file).126 """127 stream = BytesIO()128 writer = csv.writer(stream)129 for row in rows:130 for i, col in enumerate(row):131 if isinstance(col, basestring):132 col = col.encode('latin-1', errors='replace')133 row[i] = col134 writer.writerow(row)135 return stream.getvalue()136if __name__ == '__main__':137 import json138 doc = json.load(open('geobox/test/test.geojson'))139 rows = geojson_to_rows(doc, headers=['Name'])140 with open('/tmp/foo.ods', 'wb') as f:141 f.write(ods_export(rows, with_headers=True))142 with open('/tmp/foo.csv', 'wb') as f:...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!