Best Python code snippet using autotest_python
test_socketlevel.py
Source:test_socketlevel.py
...39 b'Set-Cookie: foo=1\r\n'40 b'Set-Cookie: bar=1\r\n'41 b'\r\n')42 sock.close()43 self._start_server(multicookie_response_handler)44 pool = HTTPConnectionPool(self.host, self.port)45 self.addCleanup(pool.close)46 r = pool.request('GET', '/', retries=0)47 self.assertEqual(r.headers, {'set-cookie': 'foo=1, bar=1'})48 self.assertEqual(r.headers.getlist('set-cookie'), ['foo=1', 'bar=1'])49class TestSNI(SocketDummyServerTestCase):50 def test_hostname_in_first_request_packet(self):51 if not HAS_SNI:52 raise SkipTest('SNI-support not available')53 done_receiving = Event()54 self.buf = b''55 def socket_handler(listener):56 sock = listener.accept()[0]57 self.buf = sock.recv(65536) # We only accept one packet58 done_receiving.set() # let the test know it can proceed59 sock.close()60 self._start_server(socket_handler)61 pool = HTTPSConnectionPool(self.host, self.port)62 self.addCleanup(pool.close)63 try:64 pool.request('GET', '/', retries=0)65 except MaxRetryError: # We are violating the protocol66 pass67 done_receiving.wait()68 self.assertTrue(self.host.encode('ascii') in self.buf,69 "missing hostname in SSL handshake")70class TestClientCerts(SocketDummyServerTestCase):71 """72 Tests for client certificate support.73 """74 def _wrap_in_ssl(self, sock):75 """76 Given a single socket, wraps it in TLS.77 """78 return ssl.wrap_socket(79 sock,80 ssl_version=ssl.PROTOCOL_SSLv23,81 cert_reqs=ssl.CERT_REQUIRED,82 ca_certs=DEFAULT_CA,83 certfile=DEFAULT_CERTS['certfile'],84 keyfile=DEFAULT_CERTS['keyfile'],85 server_side=True86 )87 def test_client_certs_two_files(self):88 """89 Having a client cert in a separate file to its associated key works90 properly.91 """92 done_receiving = Event()93 client_certs = []94 def socket_handler(listener):95 sock = listener.accept()[0]96 sock = self._wrap_in_ssl(sock)97 client_certs.append(sock.getpeercert())98 data = b''99 while not data.endswith(b'\r\n\r\n'):100 data += sock.recv(8192)101 sock.sendall(102 b'HTTP/1.1 200 OK\r\n'103 b'Server: testsocket\r\n'104 b'Connection: close\r\n'105 b'Content-Length: 6\r\n'106 b'\r\n'107 b'Valid!'108 )109 done_receiving.wait(5)110 sock.close()111 self._start_server(socket_handler)112 pool = HTTPSConnectionPool(113 self.host,114 self.port,115 cert_file=DEFAULT_CERTS['certfile'],116 key_file=DEFAULT_CERTS['keyfile'],117 cert_reqs='REQUIRED',118 ca_certs=DEFAULT_CA,119 )120 self.addCleanup(pool.close)121 pool.request('GET', '/', retries=0)122 done_receiving.set()123 self.assertEqual(len(client_certs), 1)124 def test_client_certs_one_file(self):125 """126 Having a client cert and its associated private key in just one file127 works properly.128 """129 done_receiving = Event()130 client_certs = []131 def socket_handler(listener):132 sock = listener.accept()[0]133 sock = self._wrap_in_ssl(sock)134 client_certs.append(sock.getpeercert())135 data = b''136 while not data.endswith(b'\r\n\r\n'):137 data += sock.recv(8192)138 sock.sendall(139 b'HTTP/1.1 200 OK\r\n'140 b'Server: testsocket\r\n'141 b'Connection: close\r\n'142 b'Content-Length: 6\r\n'143 b'\r\n'144 b'Valid!'145 )146 done_receiving.wait(5)147 sock.close()148 self._start_server(socket_handler)149 pool = HTTPSConnectionPool(150 self.host,151 self.port,152 cert_file=COMBINED_CERT_AND_KEY,153 cert_reqs='REQUIRED',154 ca_certs=DEFAULT_CA,155 )156 self.addCleanup(pool.close)157 pool.request('GET', '/', retries=0)158 done_receiving.set()159 self.assertEqual(len(client_certs), 1)160 def test_missing_client_certs_raises_error(self):161 """162 Having client certs not be present causes an error.163 """164 done_receiving = Event()165 def socket_handler(listener):166 sock = listener.accept()[0]167 try:168 self._wrap_in_ssl(sock)169 except ssl.SSLError:170 pass171 done_receiving.wait(5)172 sock.close()173 self._start_server(socket_handler)174 pool = HTTPSConnectionPool(175 self.host,176 self.port,177 cert_reqs='REQUIRED',178 ca_certs=DEFAULT_CA,179 )180 self.addCleanup(pool.close)181 try:182 pool.request('GET', '/', retries=0)183 except MaxRetryError:184 done_receiving.set()185 else:186 done_receiving.set()187 self.fail(188 "Expected server to reject connection due to missing client "189 "certificates"190 )191class TestSocketClosing(SocketDummyServerTestCase):192 def test_recovery_when_server_closes_connection(self):193 # Does the pool work seamlessly if an open connection in the194 # connection pool gets hung up on by the server, then reaches195 # the front of the queue again?196 done_closing = Event()197 def socket_handler(listener):198 for i in 0, 1:199 sock = listener.accept()[0]200 buf = b''201 while not buf.endswith(b'\r\n\r\n'):202 buf = sock.recv(65536)203 body = 'Response %d' % i204 sock.send(('HTTP/1.1 200 OK\r\n'205 'Content-Type: text/plain\r\n'206 'Content-Length: %d\r\n'207 '\r\n'208 '%s' % (len(body), body)).encode('utf-8'))209 sock.close() # simulate a server timing out, closing socket210 done_closing.set() # let the test know it can proceed211 self._start_server(socket_handler)212 pool = HTTPConnectionPool(self.host, self.port)213 self.addCleanup(pool.close)214 response = pool.request('GET', '/', retries=0)215 self.assertEqual(response.status, 200)216 self.assertEqual(response.data, b'Response 0')217 done_closing.wait() # wait until the socket in our pool gets closed218 response = pool.request('GET', '/', retries=0)219 self.assertEqual(response.status, 200)220 self.assertEqual(response.data, b'Response 1')221 def test_connection_refused(self):222 # Does the pool retry if there is no listener on the port?223 host, port = get_unreachable_address()224 http = HTTPConnectionPool(host, port, maxsize=3, block=True)225 self.addCleanup(http.close)226 self.assertRaises(MaxRetryError, http.request, 'GET', '/', retries=0, release_conn=False)227 self.assertEqual(http.pool.qsize(), http.pool.maxsize)228 def test_connection_read_timeout(self):229 timed_out = Event()230 def socket_handler(listener):231 sock = listener.accept()[0]232 while not sock.recv(65536).endswith(b'\r\n\r\n'):233 pass234 timed_out.wait()235 sock.close()236 self._start_server(socket_handler)237 http = HTTPConnectionPool(self.host, self.port,238 timeout=0.001,239 retries=False,240 maxsize=3,241 block=True)242 self.addCleanup(http.close)243 try:244 self.assertRaises(ReadTimeoutError, http.request, 'GET', '/', release_conn=False)245 finally:246 timed_out.set()247 self.assertEqual(http.pool.qsize(), http.pool.maxsize)248 def test_read_timeout_dont_retry_method_not_in_whitelist(self):249 timed_out = Event()250 def socket_handler(listener):251 sock = listener.accept()[0]252 sock.recv(65536)253 timed_out.wait()254 sock.close()255 self._start_server(socket_handler)256 pool = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=True)257 self.addCleanup(pool.close)258 try:259 self.assertRaises(ReadTimeoutError, pool.request, 'POST', '/')260 finally:261 timed_out.set()262 def test_https_connection_read_timeout(self):263 """ Handshake timeouts should fail with a Timeout"""264 timed_out = Event()265 def socket_handler(listener):266 sock = listener.accept()[0]267 while not sock.recv(65536):268 pass269 timed_out.wait()270 sock.close()271 self._start_server(socket_handler)272 pool = HTTPSConnectionPool(self.host, self.port, timeout=0.001, retries=False)273 self.addCleanup(pool.close)274 try:275 self.assertRaises(ReadTimeoutError, pool.request, 'GET', '/')276 finally:277 timed_out.set()278 def test_timeout_errors_cause_retries(self):279 def socket_handler(listener):280 sock_timeout = listener.accept()[0]281 # Wait for a second request before closing the first socket.282 sock = listener.accept()[0]283 sock_timeout.close()284 # Second request.285 buf = b''286 while not buf.endswith(b'\r\n\r\n'):287 buf += sock.recv(65536)288 # Now respond immediately.289 body = 'Response 2'290 sock.send(('HTTP/1.1 200 OK\r\n'291 'Content-Type: text/plain\r\n'292 'Content-Length: %d\r\n'293 '\r\n'294 '%s' % (len(body), body)).encode('utf-8'))295 sock.close()296 # In situations where the main thread throws an exception, the server297 # thread can hang on an accept() call. This ensures everything times298 # out within 1 second. This should be long enough for any socket299 # operations in the test suite to complete300 default_timeout = socket.getdefaulttimeout()301 socket.setdefaulttimeout(1)302 try:303 self._start_server(socket_handler)304 t = Timeout(connect=0.001, read=0.001)305 pool = HTTPConnectionPool(self.host, self.port, timeout=t)306 self.addCleanup(pool.close)307 response = pool.request('GET', '/', retries=1)308 self.assertEqual(response.status, 200)309 self.assertEqual(response.data, b'Response 2')310 finally:311 socket.setdefaulttimeout(default_timeout)312 def test_delayed_body_read_timeout(self):313 timed_out = Event()314 def socket_handler(listener):315 sock = listener.accept()[0]316 buf = b''317 body = 'Hi'318 while not buf.endswith(b'\r\n\r\n'):319 buf = sock.recv(65536)320 sock.send(('HTTP/1.1 200 OK\r\n'321 'Content-Type: text/plain\r\n'322 'Content-Length: %d\r\n'323 '\r\n' % len(body)).encode('utf-8'))324 timed_out.wait()325 sock.send(body.encode('utf-8'))326 sock.close()327 self._start_server(socket_handler)328 pool = HTTPConnectionPool(self.host, self.port)329 self.addCleanup(pool.close)330 response = pool.urlopen('GET', '/', retries=0, preload_content=False,331 timeout=Timeout(connect=1, read=0.001))332 try:333 self.assertRaises(ReadTimeoutError, response.read)334 finally:335 timed_out.set()336 def test_delayed_body_read_timeout_with_preload(self):337 timed_out = Event()338 def socket_handler(listener):339 sock = listener.accept()[0]340 buf = b''341 body = 'Hi'342 while not buf.endswith(b'\r\n\r\n'):343 buf += sock.recv(65536)344 sock.send(('HTTP/1.1 200 OK\r\n'345 'Content-Type: text/plain\r\n'346 'Content-Length: %d\r\n'347 '\r\n' % len(body)).encode('utf-8'))348 timed_out.wait(5)349 sock.close()350 self._start_server(socket_handler)351 pool = HTTPConnectionPool(self.host, self.port)352 self.addCleanup(pool.close)353 try:354 self.assertRaises(ReadTimeoutError, pool.urlopen,355 'GET', '/', retries=False,356 timeout=Timeout(connect=1, read=0.001))357 finally:358 timed_out.set()359 def test_incomplete_response(self):360 body = 'Response'361 partial_body = body[:2]362 def socket_handler(listener):363 sock = listener.accept()[0]364 # Consume request365 buf = b''366 while not buf.endswith(b'\r\n\r\n'):367 buf = sock.recv(65536)368 # Send partial response and close socket.369 sock.send((370 'HTTP/1.1 200 OK\r\n'371 'Content-Type: text/plain\r\n'372 'Content-Length: %d\r\n'373 '\r\n'374 '%s' % (len(body), partial_body)).encode('utf-8')375 )376 sock.close()377 self._start_server(socket_handler)378 pool = HTTPConnectionPool(self.host, self.port)379 self.addCleanup(pool.close)380 response = pool.request('GET', '/', retries=0, preload_content=False)381 self.assertRaises(ProtocolError, response.read)382 def test_retry_weird_http_version(self):383 """ Retry class should handle httplib.BadStatusLine errors properly """384 def socket_handler(listener):385 sock = listener.accept()[0]386 # First request.387 # Pause before responding so the first request times out.388 buf = b''389 while not buf.endswith(b'\r\n\r\n'):390 buf += sock.recv(65536)391 # send unknown http protocol392 body = "bad http 0.5 response"393 sock.send(('HTTP/0.5 200 OK\r\n'394 'Content-Type: text/plain\r\n'395 'Content-Length: %d\r\n'396 '\r\n'397 '%s' % (len(body), body)).encode('utf-8'))398 sock.close()399 # Second request.400 sock = listener.accept()[0]401 buf = b''402 while not buf.endswith(b'\r\n\r\n'):403 buf += sock.recv(65536)404 # Now respond immediately.405 sock.send(('HTTP/1.1 200 OK\r\n'406 'Content-Type: text/plain\r\n'407 'Content-Length: %d\r\n'408 '\r\n'409 'foo' % (len('foo'))).encode('utf-8'))410 sock.close() # Close the socket.411 self._start_server(socket_handler)412 pool = HTTPConnectionPool(self.host, self.port)413 self.addCleanup(pool.close)414 retry = Retry(read=1)415 response = pool.request('GET', '/', retries=retry)416 self.assertEqual(response.status, 200)417 self.assertEqual(response.data, b'foo')418 def test_connection_cleanup_on_read_timeout(self):419 timed_out = Event()420 def socket_handler(listener):421 sock = listener.accept()[0]422 buf = b''423 body = 'Hi'424 while not buf.endswith(b'\r\n\r\n'):425 buf = sock.recv(65536)426 sock.send(('HTTP/1.1 200 OK\r\n'427 'Content-Type: text/plain\r\n'428 'Content-Length: %d\r\n'429 '\r\n' % len(body)).encode('utf-8'))430 timed_out.wait()431 sock.close()432 self._start_server(socket_handler)433 with HTTPConnectionPool(self.host, self.port) as pool:434 poolsize = pool.pool.qsize()435 response = pool.urlopen('GET', '/', retries=0, preload_content=False,436 timeout=Timeout(connect=1, read=0.001))437 try:438 self.assertRaises(ReadTimeoutError, response.read)439 self.assertEqual(poolsize, pool.pool.qsize())440 finally:441 timed_out.set()442 def test_connection_cleanup_on_protocol_error_during_read(self):443 body = 'Response'444 partial_body = body[:2]445 def socket_handler(listener):446 sock = listener.accept()[0]447 # Consume request448 buf = b''449 while not buf.endswith(b'\r\n\r\n'):450 buf = sock.recv(65536)451 # Send partial response and close socket.452 sock.send(('HTTP/1.1 200 OK\r\n'453 'Content-Type: text/plain\r\n'454 'Content-Length: %d\r\n'455 '\r\n'456 '%s' % (len(body), partial_body)).encode('utf-8'))457 sock.close()458 self._start_server(socket_handler)459 with HTTPConnectionPool(self.host, self.port) as pool:460 poolsize = pool.pool.qsize()461 response = pool.request('GET', '/', retries=0, preload_content=False)462 self.assertRaises(ProtocolError, response.read)463 self.assertEqual(poolsize, pool.pool.qsize())464 def test_connection_closed_on_read_timeout_preload_false(self):465 timed_out = Event()466 def socket_handler(listener):467 sock = listener.accept()[0]468 # Consume request469 buf = b''470 while not buf.endswith(b'\r\n\r\n'):471 buf = sock.recv(65535)472 # Send partial chunked response and then hang.473 sock.send((474 'HTTP/1.1 200 OK\r\n'475 'Content-Type: text/plain\r\n'476 'Transfer-Encoding: chunked\r\n'477 '\r\n'478 '8\r\n'479 '12345678\r\n').encode('utf-8')480 )481 timed_out.wait(5)482 # Expect a new request, but keep hold of the old socket to avoid483 # leaking it. Because we don't want to hang this thread, we484 # actually use select.select to confirm that a new request is485 # coming in: this lets us time the thread out.486 rlist, _, _ = select.select([listener], [], [], 1)487 assert rlist488 new_sock = listener.accept()[0]489 # Consume request490 buf = b''491 while not buf.endswith(b'\r\n\r\n'):492 buf = new_sock.recv(65535)493 # Send complete chunked response.494 new_sock.send((495 'HTTP/1.1 200 OK\r\n'496 'Content-Type: text/plain\r\n'497 'Transfer-Encoding: chunked\r\n'498 '\r\n'499 '8\r\n'500 '12345678\r\n'501 '0\r\n\r\n').encode('utf-8')502 )503 new_sock.close()504 sock.close()505 self._start_server(socket_handler)506 with HTTPConnectionPool(self.host, self.port) as pool:507 # First request should fail.508 response = pool.urlopen('GET', '/', retries=0,509 preload_content=False,510 timeout=Timeout(connect=1, read=0.001))511 try:512 self.assertRaises(ReadTimeoutError, response.read)513 finally:514 timed_out.set()515 # Second should succeed.516 response = pool.urlopen('GET', '/', retries=0,517 preload_content=False,518 timeout=Timeout(connect=1, read=0.1))519 self.assertEqual(len(response.read()), 8)520 def test_closing_response_actually_closes_connection(self):521 done_closing = Event()522 complete = Event()523 # The insane use of this variable here is to get around the fact that524 # Python 2.6 does not support returning a value from Event.wait(). This525 # means we can't tell if an event timed out, so we can't use the timing526 # out of the 'complete' event to determine the success or failure of527 # the test. Python 2 also doesn't have the nonlocal statement, so we528 # can't write directly to this variable, only mutate it. Hence: list.529 successful = []530 def socket_handler(listener):531 sock = listener.accept()[0]532 buf = b''533 while not buf.endswith(b'\r\n\r\n'):534 buf = sock.recv(65536)535 sock.send(('HTTP/1.1 200 OK\r\n'536 'Content-Type: text/plain\r\n'537 'Content-Length: 0\r\n'538 '\r\n').encode('utf-8'))539 # Wait for the socket to close.540 done_closing.wait(timeout=1)541 # Look for the empty string to show that the connection got closed.542 # Don't get stuck in a timeout.543 sock.settimeout(1)544 new_data = sock.recv(65536)545 self.assertFalse(new_data)546 successful.append(True)547 sock.close()548 complete.set()549 self._start_server(socket_handler)550 pool = HTTPConnectionPool(self.host, self.port)551 self.addCleanup(pool.close)552 response = pool.request('GET', '/', retries=0, preload_content=False)553 self.assertEqual(response.status, 200)554 response.close()555 done_closing.set() # wait until the socket in our pool gets closed556 complete.wait(timeout=1)557 if not successful:558 self.fail("Timed out waiting for connection close")559 def test_release_conn_param_is_respected_after_timeout_retry(self):560 """For successful ```urlopen(release_conn=False)```,561 the connection isn't released, even after a retry.562 This test allows a retry: one request fails, the next request succeeds.563 This is a regression test for issue #651 [1], where the connection564 would be released if the initial request failed, even if a retry565 succeeded.566 [1] <https://github.com/shazow/urllib3/issues/651>567 """568 def socket_handler(listener):569 sock = listener.accept()[0]570 consume_socket(sock)571 # Close the connection, without sending any response (not even the572 # HTTP status line). This will trigger a `Timeout` on the client,573 # inside `urlopen()`.574 sock.close()575 # Expect a new request. Because we don't want to hang this thread,576 # we actually use select.select to confirm that a new request is577 # coming in: this lets us time the thread out.578 rlist, _, _ = select.select([listener], [], [], 5)579 assert rlist580 sock = listener.accept()[0]581 consume_socket(sock)582 # Send complete chunked response.583 sock.send((584 'HTTP/1.1 200 OK\r\n'585 'Content-Type: text/plain\r\n'586 'Transfer-Encoding: chunked\r\n'587 '\r\n'588 '8\r\n'589 '12345678\r\n'590 '0\r\n\r\n').encode('utf-8')591 )592 sock.close()593 self._start_server(socket_handler)594 with HTTPConnectionPool(self.host, self.port, maxsize=1) as pool:595 # First request should fail, but the timeout and `retries=1` should596 # save it.597 response = pool.urlopen('GET', '/', retries=1,598 release_conn=False, preload_content=False,599 timeout=Timeout(connect=1, read=0.001))600 # The connection should still be on the response object, and none601 # should be in the pool. We opened two though.602 self.assertEqual(pool.num_connections, 2)603 self.assertEqual(pool.pool.qsize(), 0)604 self.assertTrue(response.connection is not None)605 # Consume the data. This should put the connection back.606 response.read()607 self.assertEqual(pool.pool.qsize(), 1)608 self.assertTrue(response.connection is None)609class TestProxyManager(SocketDummyServerTestCase):610 def test_simple(self):611 def echo_socket_handler(listener):612 sock = listener.accept()[0]613 buf = b''614 while not buf.endswith(b'\r\n\r\n'):615 buf += sock.recv(65536)616 sock.send(('HTTP/1.1 200 OK\r\n'617 'Content-Type: text/plain\r\n'618 'Content-Length: %d\r\n'619 '\r\n'620 '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))621 sock.close()622 self._start_server(echo_socket_handler)623 base_url = 'http://%s:%d' % (self.host, self.port)624 proxy = proxy_from_url(base_url)625 self.addCleanup(proxy.clear)626 r = proxy.request('GET', 'http://google.com/')627 self.assertEqual(r.status, 200)628 # FIXME: The order of the headers is not predictable right now. We629 # should fix that someday (maybe when we migrate to630 # OrderedDict/MultiDict).631 self.assertEqual(sorted(r.data.split(b'\r\n')),632 sorted([633 b'GET http://google.com/ HTTP/1.1',634 b'Host: google.com',635 b'Accept-Encoding: identity',636 b'Accept: */*',637 b'',638 b'',639 ]))640 def test_headers(self):641 def echo_socket_handler(listener):642 sock = listener.accept()[0]643 buf = b''644 while not buf.endswith(b'\r\n\r\n'):645 buf += sock.recv(65536)646 sock.send(('HTTP/1.1 200 OK\r\n'647 'Content-Type: text/plain\r\n'648 'Content-Length: %d\r\n'649 '\r\n'650 '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))651 sock.close()652 self._start_server(echo_socket_handler)653 base_url = 'http://%s:%d' % (self.host, self.port)654 # Define some proxy headers.655 proxy_headers = HTTPHeaderDict({'For The Proxy': 'YEAH!'})656 proxy = proxy_from_url(base_url, proxy_headers=proxy_headers)657 self.addCleanup(proxy.clear)658 conn = proxy.connection_from_url('http://www.google.com/')659 r = conn.urlopen('GET', 'http://www.google.com/', assert_same_host=False)660 self.assertEqual(r.status, 200)661 # FIXME: The order of the headers is not predictable right now. We662 # should fix that someday (maybe when we migrate to663 # OrderedDict/MultiDict).664 self.assertTrue(b'For The Proxy: YEAH!\r\n' in r.data)665 def test_retries(self):666 close_event = Event()667 def echo_socket_handler(listener):668 sock = listener.accept()[0]669 # First request, which should fail670 sock.close()671 # Second request672 sock = listener.accept()[0]673 buf = b''674 while not buf.endswith(b'\r\n\r\n'):675 buf += sock.recv(65536)676 sock.send(('HTTP/1.1 200 OK\r\n'677 'Content-Type: text/plain\r\n'678 'Content-Length: %d\r\n'679 '\r\n'680 '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))681 sock.close()682 close_event.set()683 self._start_server(echo_socket_handler)684 base_url = 'http://%s:%d' % (self.host, self.port)685 proxy = proxy_from_url(base_url)686 self.addCleanup(proxy.clear)687 conn = proxy.connection_from_url('http://www.google.com')688 r = conn.urlopen('GET', 'http://www.google.com',689 assert_same_host=False, retries=1)690 self.assertEqual(r.status, 200)691 close_event.wait(timeout=1)692 self.assertRaises(ProxyError, conn.urlopen, 'GET',693 'http://www.google.com',694 assert_same_host=False, retries=False)695 def test_connect_reconn(self):696 def proxy_ssl_one(listener):697 sock = listener.accept()[0]698 buf = b''699 while not buf.endswith(b'\r\n\r\n'):700 buf += sock.recv(65536)701 s = buf.decode('utf-8')702 if not s.startswith('CONNECT '):703 sock.send(('HTTP/1.1 405 Method not allowed\r\n'704 'Allow: CONNECT\r\n\r\n').encode('utf-8'))705 sock.close()706 return707 if not s.startswith('CONNECT %s:443' % (self.host,)):708 sock.send(('HTTP/1.1 403 Forbidden\r\n\r\n').encode('utf-8'))709 sock.close()710 return711 sock.send(('HTTP/1.1 200 Connection Established\r\n\r\n').encode('utf-8'))712 ssl_sock = ssl.wrap_socket(sock,713 server_side=True,714 keyfile=DEFAULT_CERTS['keyfile'],715 certfile=DEFAULT_CERTS['certfile'],716 ca_certs=DEFAULT_CA)717 buf = b''718 while not buf.endswith(b'\r\n\r\n'):719 buf += ssl_sock.recv(65536)720 ssl_sock.send(('HTTP/1.1 200 OK\r\n'721 'Content-Type: text/plain\r\n'722 'Content-Length: 2\r\n'723 'Connection: close\r\n'724 '\r\n'725 'Hi').encode('utf-8'))726 ssl_sock.close()727 def echo_socket_handler(listener):728 proxy_ssl_one(listener)729 proxy_ssl_one(listener)730 self._start_server(echo_socket_handler)731 base_url = 'http://%s:%d' % (self.host, self.port)732 proxy = proxy_from_url(base_url)733 self.addCleanup(proxy.clear)734 url = 'https://{0}'.format(self.host)735 conn = proxy.connection_from_url(url)736 r = conn.urlopen('GET', url, retries=0)737 self.assertEqual(r.status, 200)738 r = conn.urlopen('GET', url, retries=0)739 self.assertEqual(r.status, 200)740 def test_connect_ipv6_addr(self):741 ipv6_addr = '2001:4998:c:a06::2:4008'742 def echo_socket_handler(listener):743 sock = listener.accept()[0]744 buf = b''745 while not buf.endswith(b'\r\n\r\n'):746 buf += sock.recv(65536)747 s = buf.decode('utf-8')748 if s.startswith('CONNECT [%s]:443' % (ipv6_addr,)):749 sock.send(b'HTTP/1.1 200 Connection Established\r\n\r\n')750 ssl_sock = ssl.wrap_socket(sock,751 server_side=True,752 keyfile=DEFAULT_CERTS['keyfile'],753 certfile=DEFAULT_CERTS['certfile'])754 buf = b''755 while not buf.endswith(b'\r\n\r\n'):756 buf += ssl_sock.recv(65536)757 ssl_sock.send(b'HTTP/1.1 200 OK\r\n'758 b'Content-Type: text/plain\r\n'759 b'Content-Length: 2\r\n'760 b'Connection: close\r\n'761 b'\r\n'762 b'Hi')763 ssl_sock.close()764 else:765 sock.close()766 self._start_server(echo_socket_handler)767 base_url = 'http://%s:%d' % (self.host, self.port)768 proxy = proxy_from_url(base_url)769 self.addCleanup(proxy.clear)770 url = 'https://[{0}]'.format(ipv6_addr)771 conn = proxy.connection_from_url(url)772 try:773 r = conn.urlopen('GET', url, retries=0)774 self.assertEqual(r.status, 200)775 except MaxRetryError:776 self.fail('Invalid IPv6 format in HTTP CONNECT request')777class TestSSL(SocketDummyServerTestCase):778 def test_ssl_failure_midway_through_conn(self):779 def socket_handler(listener):780 sock = listener.accept()[0]781 sock2 = sock.dup()782 ssl_sock = ssl.wrap_socket(sock,783 server_side=True,784 keyfile=DEFAULT_CERTS['keyfile'],785 certfile=DEFAULT_CERTS['certfile'],786 ca_certs=DEFAULT_CA)787 buf = b''788 while not buf.endswith(b'\r\n\r\n'):789 buf += ssl_sock.recv(65536)790 # Deliberately send from the non-SSL socket.791 sock2.send((792 'HTTP/1.1 200 OK\r\n'793 'Content-Type: text/plain\r\n'794 'Content-Length: 2\r\n'795 '\r\n'796 'Hi').encode('utf-8'))797 sock2.close()798 ssl_sock.close()799 self._start_server(socket_handler)800 pool = HTTPSConnectionPool(self.host, self.port)801 self.addCleanup(pool.close)802 with self.assertRaises(MaxRetryError) as cm:803 pool.request('GET', '/', retries=0)804 self.assertIsInstance(cm.exception.reason, SSLError)805 def test_ssl_read_timeout(self):806 timed_out = Event()807 def socket_handler(listener):808 sock = listener.accept()[0]809 ssl_sock = ssl.wrap_socket(sock,810 server_side=True,811 keyfile=DEFAULT_CERTS['keyfile'],812 certfile=DEFAULT_CERTS['certfile'],813 ca_certs=DEFAULT_CA)814 buf = b''815 while not buf.endswith(b'\r\n\r\n'):816 buf += ssl_sock.recv(65536)817 # Send incomplete message (note Content-Length)818 ssl_sock.send((819 'HTTP/1.1 200 OK\r\n'820 'Content-Type: text/plain\r\n'821 'Content-Length: 10\r\n'822 '\r\n'823 'Hi-').encode('utf-8'))824 timed_out.wait()825 sock.close()826 ssl_sock.close()827 self._start_server(socket_handler)828 pool = HTTPSConnectionPool(self.host, self.port)829 self.addCleanup(pool.close)830 response = pool.urlopen('GET', '/', retries=0, preload_content=False,831 timeout=Timeout(connect=1, read=0.001))832 try:833 self.assertRaises(ReadTimeoutError, response.read)834 finally:835 timed_out.set()836 def test_ssl_failed_fingerprint_verification(self):837 def socket_handler(listener):838 for i in range(2):839 sock = listener.accept()[0]840 ssl_sock = ssl.wrap_socket(sock,841 server_side=True,842 keyfile=DEFAULT_CERTS['keyfile'],843 certfile=DEFAULT_CERTS['certfile'],844 ca_certs=DEFAULT_CA)845 ssl_sock.send(b'HTTP/1.1 200 OK\r\n'846 b'Content-Type: text/plain\r\n'847 b'Content-Length: 5\r\n\r\n'848 b'Hello')849 ssl_sock.close()850 sock.close()851 self._start_server(socket_handler)852 # GitHub's fingerprint. Valid, but not matching.853 fingerprint = ('A0:C4:A7:46:00:ED:A7:2D:C0:BE:CB'854 ':9A:8C:B6:07:CA:58:EE:74:5E')855 def request():856 pool = HTTPSConnectionPool(self.host, self.port,857 assert_fingerprint=fingerprint)858 try:859 response = pool.urlopen('GET', '/', preload_content=False,860 timeout=Timeout(connect=1, read=0.001),861 retries=0)862 response.read()863 finally:864 pool.close()865 with self.assertRaises(MaxRetryError) as cm:866 request()867 self.assertIsInstance(cm.exception.reason, SSLError)868 # Should not hang, see https://github.com/shazow/urllib3/issues/529869 self.assertRaises(MaxRetryError, request)870 def test_retry_ssl_error(self):871 def socket_handler(listener):872 # first request, trigger an SSLError873 sock = listener.accept()[0]874 sock2 = sock.dup()875 ssl_sock = ssl.wrap_socket(sock,876 server_side=True,877 keyfile=DEFAULT_CERTS['keyfile'],878 certfile=DEFAULT_CERTS['certfile'])879 buf = b''880 while not buf.endswith(b'\r\n\r\n'):881 buf += ssl_sock.recv(65536)882 # Deliberately send from the non-SSL socket to trigger an SSLError883 sock2.send((884 'HTTP/1.1 200 OK\r\n'885 'Content-Type: text/plain\r\n'886 'Content-Length: 4\r\n'887 '\r\n'888 'Fail').encode('utf-8'))889 sock2.close()890 ssl_sock.close()891 # retried request892 sock = listener.accept()[0]893 ssl_sock = ssl.wrap_socket(sock,894 server_side=True,895 keyfile=DEFAULT_CERTS['keyfile'],896 certfile=DEFAULT_CERTS['certfile'])897 buf = b''898 while not buf.endswith(b'\r\n\r\n'):899 buf += ssl_sock.recv(65536)900 ssl_sock.send(b'HTTP/1.1 200 OK\r\n'901 b'Content-Type: text/plain\r\n'902 b'Content-Length: 7\r\n\r\n'903 b'Success')904 ssl_sock.close()905 self._start_server(socket_handler)906 pool = HTTPSConnectionPool(self.host, self.port)907 self.addCleanup(pool.close)908 response = pool.urlopen('GET', '/', retries=1)909 self.assertEqual(response.data, b'Success')910class TestErrorWrapping(SocketDummyServerTestCase):911 def test_bad_statusline(self):912 self.start_response_handler(913 b'HTTP/1.1 Omg What Is This?\r\n'914 b'Content-Length: 0\r\n'915 b'\r\n'916 )917 pool = HTTPConnectionPool(self.host, self.port, retries=False)918 self.addCleanup(pool.close)919 self.assertRaises(ProtocolError, pool.request, 'GET', '/')920 def test_unknown_protocol(self):921 self.start_response_handler(922 b'HTTP/1000 200 OK\r\n'923 b'Content-Length: 0\r\n'924 b'\r\n'925 )926 pool = HTTPConnectionPool(self.host, self.port, retries=False)927 self.addCleanup(pool.close)928 self.assertRaises(ProtocolError, pool.request, 'GET', '/')929class TestHeaders(SocketDummyServerTestCase):930 @onlyPy3931 def test_httplib_headers_case_insensitive(self):932 self.start_response_handler(933 b'HTTP/1.1 200 OK\r\n'934 b'Content-Length: 0\r\n'935 b'Content-type: text/plain\r\n'936 b'\r\n'937 )938 pool = HTTPConnectionPool(self.host, self.port, retries=False)939 self.addCleanup(pool.close)940 HEADERS = {'Content-Length': '0', 'Content-type': 'text/plain'}941 r = pool.request('GET', '/')942 self.assertEqual(HEADERS, dict(r.headers.items())) # to preserve case sensitivity943 def test_headers_are_sent_with_the_original_case(self):944 headers = {'foo': 'bar', 'bAz': 'quux'}945 parsed_headers = {}946 def socket_handler(listener):947 sock = listener.accept()[0]948 buf = b''949 while not buf.endswith(b'\r\n\r\n'):950 buf += sock.recv(65536)951 headers_list = [header for header in buf.split(b'\r\n')[1:] if header]952 for header in headers_list:953 (key, value) = header.split(b': ')954 parsed_headers[key.decode('ascii')] = value.decode('ascii')955 sock.send((956 'HTTP/1.1 204 No Content\r\n'957 'Content-Length: 0\r\n'958 '\r\n').encode('utf-8'))959 sock.close()960 self._start_server(socket_handler)961 expected_headers = {'Accept-Encoding': 'identity',962 'Host': '{0}:{1}'.format(self.host, self.port)}963 expected_headers.update(headers)964 pool = HTTPConnectionPool(self.host, self.port, retries=False)965 self.addCleanup(pool.close)966 pool.request('GET', '/', headers=HTTPHeaderDict(headers))967 self.assertEqual(expected_headers, parsed_headers)968 def test_request_headers_are_sent_in_the_original_order(self):969 # NOTE: Probability this test gives a false negative is 1/(K!)970 K = 16971 # NOTE: Provide headers in non-sorted order (i.e. reversed)972 # so that if the internal implementation tries to sort them,973 # a change will be detected.974 expected_request_headers = [(u'X-Header-%d' % i, str(i)) for i in reversed(range(K))]975 actual_request_headers = []976 def socket_handler(listener):977 sock = listener.accept()[0]978 buf = b''979 while not buf.endswith(b'\r\n\r\n'):980 buf += sock.recv(65536)981 headers_list = [header for header in buf.split(b'\r\n')[1:] if header]982 for header in headers_list:983 (key, value) = header.split(b': ')984 if not key.decode('ascii').startswith(u'X-Header-'):985 continue986 actual_request_headers.append((key.decode('ascii'), value.decode('ascii')))987 sock.send((988 u'HTTP/1.1 204 No Content\r\n'989 u'Content-Length: 0\r\n'990 u'\r\n').encode('ascii'))991 sock.close()992 self._start_server(socket_handler)993 pool = HTTPConnectionPool(self.host, self.port, retries=False)994 self.addCleanup(pool.close)995 pool.request('GET', '/', headers=OrderedDict(expected_request_headers))996 self.assertEqual(expected_request_headers, actual_request_headers)997 def test_response_headers_are_returned_in_the_original_order(self):998 # NOTE: Probability this test gives a false negative is 1/(K!)999 K = 161000 # NOTE: Provide headers in non-sorted order (i.e. reversed)1001 # so that if the internal implementation tries to sort them,1002 # a change will be detected.1003 expected_response_headers = [('X-Header-%d' % i, str(i)) for i in reversed(range(K))]1004 def socket_handler(listener):1005 sock = listener.accept()[0]1006 buf = b''1007 while not buf.endswith(b'\r\n\r\n'):1008 buf += sock.recv(65536)1009 sock.send(b'HTTP/1.1 200 OK\r\n' +1010 b'\r\n'.join([1011 (k.encode('utf8') + b': ' + v.encode('utf8'))1012 for (k, v) in expected_response_headers1013 ]) +1014 b'\r\n')1015 sock.close()1016 self._start_server(socket_handler)1017 pool = HTTPConnectionPool(self.host, self.port)1018 self.addCleanup(pool.close)1019 r = pool.request('GET', '/', retries=0)1020 actual_response_headers = [1021 (k, v) for (k, v) in r.headers.items()1022 if k.startswith('X-Header-')1023 ]1024 self.assertEqual(expected_response_headers, actual_response_headers)1025class TestBrokenHeaders(SocketDummyServerTestCase):1026 def setUp(self):1027 if issubclass(httplib.HTTPMessage, MimeToolMessage):1028 raise SkipTest('Header parsing errors not available')1029 super(TestBrokenHeaders, self).setUp()1030 def _test_broken_header_parsing(self, headers):1031 self.start_response_handler((1032 b'HTTP/1.1 200 OK\r\n'1033 b'Content-Length: 0\r\n'1034 b'Content-type: text/plain\r\n'1035 ) + b'\r\n'.join(headers) + b'\r\n'1036 )1037 pool = HTTPConnectionPool(self.host, self.port, retries=False)1038 self.addCleanup(pool.close)1039 with LogRecorder() as logs:1040 pool.request('GET', '/')1041 for record in logs:1042 if 'Failed to parse headers' in record.msg and \1043 pool._absolute_url('/') == record.args[0]:1044 return1045 self.fail('Missing log about unparsed headers')1046 def test_header_without_name(self):1047 self._test_broken_header_parsing([1048 b': Value\r\n',1049 b'Another: Header\r\n',1050 ])1051 def test_header_without_name_or_value(self):1052 self._test_broken_header_parsing([1053 b':\r\n',1054 b'Another: Header\r\n',1055 ])1056 def test_header_without_colon_or_value(self):1057 self._test_broken_header_parsing([1058 b'Broken Header',1059 b'Another: Header',1060 ])1061class TestHEAD(SocketDummyServerTestCase):1062 def test_chunked_head_response_does_not_hang(self):1063 self.start_response_handler(1064 b'HTTP/1.1 200 OK\r\n'1065 b'Transfer-Encoding: chunked\r\n'1066 b'Content-type: text/plain\r\n'1067 b'\r\n'1068 )1069 pool = HTTPConnectionPool(self.host, self.port, retries=False)1070 self.addCleanup(pool.close)1071 r = pool.request('HEAD', '/', timeout=1, preload_content=False)1072 # stream will use the read_chunked method here.1073 self.assertEqual([], list(r.stream()))1074 def test_empty_head_response_does_not_hang(self):1075 self.start_response_handler(1076 b'HTTP/1.1 200 OK\r\n'1077 b'Content-Length: 256\r\n'1078 b'Content-type: text/plain\r\n'1079 b'\r\n'1080 )1081 pool = HTTPConnectionPool(self.host, self.port, retries=False)1082 self.addCleanup(pool.close)1083 r = pool.request('HEAD', '/', timeout=1, preload_content=False)1084 # stream will use the read method here.1085 self.assertEqual([], list(r.stream()))1086class TestStream(SocketDummyServerTestCase):1087 def test_stream_none_unchunked_response_does_not_hang(self):1088 done_event = Event()1089 def socket_handler(listener):1090 sock = listener.accept()[0]1091 buf = b''1092 while not buf.endswith(b'\r\n\r\n'):1093 buf += sock.recv(65536)1094 sock.send(1095 b'HTTP/1.1 200 OK\r\n'1096 b'Content-Length: 12\r\n'1097 b'Content-type: text/plain\r\n'1098 b'\r\n'1099 b'hello, world'1100 )1101 done_event.wait(5)1102 sock.close()1103 self._start_server(socket_handler)1104 pool = HTTPConnectionPool(self.host, self.port, retries=False)1105 self.addCleanup(pool.close)1106 r = pool.request('GET', '/', timeout=1, preload_content=False)1107 # Stream should read to the end.1108 self.assertEqual([b'hello, world'], list(r.stream(None)))1109 done_event.set()1110class TestBadContentLength(SocketDummyServerTestCase):1111 def test_enforce_content_length_get(self):1112 done_event = Event()1113 def socket_handler(listener):1114 sock = listener.accept()[0]1115 buf = b''1116 while not buf.endswith(b'\r\n\r\n'):1117 buf += sock.recv(65536)1118 sock.send(1119 b'HTTP/1.1 200 OK\r\n'1120 b'Content-Length: 22\r\n'1121 b'Content-type: text/plain\r\n'1122 b'\r\n'1123 b'hello, world'1124 )1125 done_event.wait(1)1126 sock.close()1127 self._start_server(socket_handler)1128 conn = HTTPConnectionPool(self.host, self.port, maxsize=1)1129 self.addCleanup(conn.close)1130 # Test stream read when content length less than headers claim1131 get_response = conn.request('GET', url='/', preload_content=False,1132 enforce_content_length=True)1133 data = get_response.stream(100)1134 # Read "good" data before we try to read again.1135 # This won't trigger till generator is exhausted.1136 next(data)1137 try:1138 next(data)1139 self.assertFail()1140 except ProtocolError as e:1141 self.assertTrue('12 bytes read, 10 more expected' in str(e))1142 done_event.set()1143 def test_enforce_content_length_no_body(self):1144 done_event = Event()1145 def socket_handler(listener):1146 sock = listener.accept()[0]1147 buf = b''1148 while not buf.endswith(b'\r\n\r\n'):1149 buf += sock.recv(65536)1150 sock.send(1151 b'HTTP/1.1 200 OK\r\n'1152 b'Content-Length: 22\r\n'1153 b'Content-type: text/plain\r\n'1154 b'\r\n'1155 )1156 done_event.wait(1)1157 sock.close()1158 self._start_server(socket_handler)1159 conn = HTTPConnectionPool(self.host, self.port, maxsize=1)1160 self.addCleanup(conn.close)1161 # Test stream on 0 length body1162 head_response = conn.request('HEAD', url='/', preload_content=False,1163 enforce_content_length=True)1164 data = [chunk for chunk in head_response.stream(1)]1165 self.assertEqual(len(data), 0)1166 done_event.set()1167class TestRetryPoolSizeDrainFail(SocketDummyServerTestCase):1168 def test_pool_size_retry_drain_fail(self):1169 def socket_handler(listener):1170 for _ in range(2):1171 sock = listener.accept()[0]1172 while not sock.recv(65536).endswith(b'\r\n\r\n'):1173 pass1174 # send a response with an invalid content length -- this causes1175 # a ProtocolError to raise when trying to drain the connection1176 sock.send(1177 b'HTTP/1.1 404 NOT FOUND\r\n'1178 b'Content-Length: 1000\r\n'1179 b'Content-Type: text/plain\r\n'1180 b'\r\n'1181 )1182 sock.close()1183 self._start_server(socket_handler)1184 retries = Retry(1185 total=1,1186 raise_on_status=False,1187 status_forcelist=[404],1188 )1189 pool = HTTPConnectionPool(self.host, self.port, maxsize=10,1190 retries=retries, block=True)1191 self.addCleanup(pool.close)1192 pool.urlopen('GET', '/not_found', preload_content=False)...
test_engine.py
Source:test_engine.py
...25 cfg.CONF(args=[], project='gate', prog='engine-server')26 self.enableFakeTransport()27 self.setupLogging()28 super(EngineTest, self).__init__(*args)29 def _start_server(self, storage_driver=None, topic=None, host=None, allow_stop=True):30 if not storage_driver:31 storage_driver = MemoryDriver('memory:///')32 server = EngineServer(33 host or 'test.host', storage_driver=storage_driver, topic=topic, allow_stop=allow_stop)34 thread = threading.Thread(target=server.start)35 thread.daemon = True36 thread.server = server37 thread.start()38 return thread39 def _stop_server(self, client, thread, topic=None):40 client = client._client41 if topic is not None:42 client = client.prepare(topic=topic)43 client.cast({}, 'stop')44 thread.join(timeout=30)45 def assertValidId(self, result):46 self.assertTrue(result is not None)47 self.assertTrue('uuid' in result)48 self.assertTrue(result['uuid'] is not None)49 def test_default_topic(self):50 thread = self._start_server()51 client = EngineClient(thread.server._transport)52 self.assertEqual(thread.server.topic, client.topic)53 self._stop_server(client, thread)54 def test_custom_topic(self):55 thread = self._start_server(topic='custom')56 client = EngineClient(thread.server._transport, topic='custom')57 self.assertEqual(thread.server.topic, client.topic)58 self._stop_server(client, thread)59 def test_case_create(self):60 thread = self._start_server()61 client = EngineClient(thread.server._transport)62 account_uuid = str(uuid.uuid4())63 result = client.case_create(account_uuid=account_uuid, name='testname')64 65 self.assertValidId(result)66 result2 = client.case_get(result['uuid'])67 self._stop_server(client, thread)68 self.assertEquals(result2['uuid'], result['uuid'])69 self.assertEquals(result2['name'], 'testname')70 def test_case_update(self):71 thread = self._start_server()72 client = EngineClient(thread.server._transport)73 account_uuid = str(uuid.uuid4())74 result = client.case_create(account_uuid=account_uuid, name='testname')75 76 self.assertValidId(result)77 result2 = client.case_update(result['uuid'], name='newname')78 self._stop_server(client, thread)79 self.assertEquals(result2['uuid'], result['uuid'])80 self.assertTrue('area' not in result2)81 self.assertEquals(result2['name'], 'newname')82 def test_case_delete(self):83 thread = self._start_server()84 client = EngineClient(thread.server._transport)85 account_uuid = str(uuid.uuid4())86 result = client.case_create(account_uuid=account_uuid, name='testname')87 88 self.assertValidId(result)89 result2 = client.case_delete(result['uuid'])90 result3 = client.case_get(result['uuid'])91 self._stop_server(client, thread)92 self.assertTrue(result2)93 self.assertTrue(result3 is None)94 def test_case_list(self):95 thread = self._start_server()96 client = EngineClient(thread.server._transport)97 account_uuid = str(uuid.uuid4())98 one = client.case_create(account_uuid=account_uuid, name='one')99 two = client.case_create(account_uuid=account_uuid, name='two')100 101 self.assertValidId(one)102 self.assertValidId(two)103 items = client.case_list(account_uuid)104 self._stop_server(client, thread)105 found = 0106 for item in items:107 if item['uuid'] == one['uuid']:108 found += 1109 self.assertEquals(item['name'], 'one')110 elif item['uuid'] == two['uuid']:111 found += 1112 self.assertEquals(item['name'], 'two')113 self.assertEquals(found, 2)114 def test_evidence_add(self):115 thread = self._start_server()116 client = EngineClient(thread.server._transport)117 case_uuid = str(uuid.uuid4())118 result = client.evidence_add(case_uuid=case_uuid, container_format='raw', container_size=1024, name='testname')119 120 self.assertValidId(result)121 result2 = client.evidence_get(result['uuid'])122 self._stop_server(client, thread)123 self.assertEquals(result2['uuid'], result['uuid'])124 self.assertEquals(result2['name'], 'testname')125 def test_evidence_update(self):126 thread = self._start_server()127 client = EngineClient(thread.server._transport)128 case_uuid = str(uuid.uuid4())129 result = client.evidence_add(case_uuid=case_uuid, container_format='raw', container_size=1024, name='testname')130 131 self.assertValidId(result)132 result2 = client.evidence_update(result['uuid'], name='newname')133 self._stop_server(client, thread)134 self.assertEquals(result2['uuid'], result['uuid'])135 self.assertTrue('area' not in result2)136 self.assertEquals(result2['name'], 'newname')137 def test_evidence_remove(self):138 thread = self._start_server()139 client = EngineClient(thread.server._transport)140 case_uuid = str(uuid.uuid4())141 result = client.evidence_add(case_uuid=case_uuid, container_format='raw', container_size=1024, name='testname')142 143 self.assertValidId(result)144 result2 = client.evidence_remove(result['uuid'])145 result3 = client.evidence_get(result['uuid'])146 self._stop_server(client, thread)147 self.assertTrue(result2)148 self.assertTrue(result3 is None)149 def test_evidence_list(self):150 thread = self._start_server()151 client = EngineClient(thread.server._transport)152 case_uuid = str(uuid.uuid4())153 one = client.evidence_add(case_uuid=case_uuid, container_format='raw', container_size=1024, name='one')154 two = client.evidence_add(case_uuid=case_uuid, container_format='raw', container_size=1024, name='two')155 156 self.assertValidId(one)157 self.assertValidId(two)158 items = client.evidence_list(case_uuid)159 self._stop_server(client, thread)160 found = 0161 for item in items:162 if item['uuid'] == one['uuid']:163 found += 1164 self.assertEquals(item['name'], 'one')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!