Best Python code snippet using autotest_python
test_ssl.py
Source:test_ssl.py
...90 stdout=subprocess.PIPE,91 stderr=subprocess.PIPE)92 time.sleep(sleepTime)93 return pid94 def stop_server(self, pid):95 pid.terminate()96 out, err = pid.communicate()97 return six.ensure_text(out), six.ensure_text(err)98 def http_get(self, s):99 s.send(b'GET / HTTP/1.0\n\n')100 resp = b''101 while 1:102 try:103 r = s.recv(4096)104 if not r:105 break106 except SSL.SSLError: # s_server throws an 'unexpected eof'...107 break108 resp = resp + r109 return six.ensure_text(resp)110 def setUp(self):111 self.srv_host = srv_host112 self.srv_port = allocate_srv_port()113 self.srv_addr = (srv_host, self.srv_port)114 self.srv_url = 'https://%s:%s/' % (srv_host, self.srv_port)115 self.args = ['s_server', '-quiet', '-www',116 # '-cert', 'server.pem', Implicitly using this117 '-accept', str(self.srv_port)]118class PassSSLClientTestCase(BaseSSLClientTestCase):119 def test_pass(self):120 pass121class HttpslibSSLClientTestCase(BaseSSLClientTestCase):122 def setUp(self):123 super(HttpslibSSLClientTestCase, self).setUp()124 self.ctx = SSL.Context()125 def tearDown(self):126 self.ctx.close()127 def test_HTTPSConnection(self):128 pid = self.start_server(self.args)129 try:130 c = httpslib.HTTPSConnection(srv_host, self.srv_port)131 c.request('GET', '/')132 data = c.getresponse().read()133 c.close()134 finally:135 self.stop_server(pid)136 self.assertIn('s_server -quiet -www', six.ensure_text(data))137 def test_HTTPSConnection_resume_session(self):138 pid = self.start_server(self.args)139 try:140 self.ctx.load_verify_locations(cafile='tests/ca.pem')141 self.ctx.load_cert('tests/x509.pem')142 self.ctx.set_verify(143 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)144 self.ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)145 c = httpslib.HTTPSConnection(srv_host, self.srv_port,146 ssl_context=self.ctx)147 c.request('GET', '/')148 ses = c.get_session()149 t = ses.as_text()150 data = c.getresponse().read()151 # Appearently closing connection here screws session; Ali Polatel?152 # c.close()153 ctx2 = SSL.Context()154 ctx2.load_verify_locations(cafile='tests/ca.pem')155 ctx2.load_cert('tests/x509.pem')156 ctx2.set_verify(157 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)158 ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)159 c2 = httpslib.HTTPSConnection(srv_host, self.srv_port,160 ssl_context=ctx2)161 c2.set_session(ses)162 c2.request('GET', '/')163 ses2 = c2.get_session()164 t2 = ses2.as_text()165 data = six.ensure_text(c2.getresponse().read())166 c.close()167 c2.close()168 self.assertEqual(t, t2, "Sessions did not match")169 finally:170 self.stop_server(pid)171 self.assertIn('s_server -quiet -www', data)172 def test_HTTPSConnection_secure_context(self):173 pid = self.start_server(self.args)174 try:175 self.ctx.set_verify(176 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)177 self.ctx.load_verify_locations('tests/ca.pem')178 c = httpslib.HTTPSConnection(srv_host, self.srv_port,179 ssl_context=self.ctx)180 c.request('GET', '/')181 data = six.ensure_text(c.getresponse().read())182 c.close()183 finally:184 self.stop_server(pid)185 self.assertIn('s_server -quiet -www', data)186 def test_HTTPSConnection_secure_context_fail(self):187 pid = self.start_server(self.args)188 try:189 self.ctx.set_verify(190 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)191 self.ctx.load_verify_locations('tests/server.pem')192 c = httpslib.HTTPSConnection(srv_host, self.srv_port,193 ssl_context=self.ctx)194 with self.assertRaises(SSL.SSLError):195 c.request('GET', '/')196 c.close()197 finally:198 self.stop_server(pid)199 def test_HTTPSConnection_illegalkeywordarg(self):200 with self.assertRaises(ValueError):201 httpslib.HTTPSConnection('example.org', badKeyword=True)202class HttpslibSSLSNIClientTestCase(BaseSSLClientTestCase):203 def setUp(self):204 super(HttpslibSSLSNIClientTestCase, self).setUp()205 self.args = ['s_server', '-servername', srv_host, '-debug', '-www', '-msg',206 '-cert', 'server.pem', '-key', 'server_key.pem',207 '-cert2', 'server.pem', '-key2', 'server_key.pem',208 '-accept', str(self.srv_port)]209 self.ctx = SSL.Context()210 def tearDown(self):211 self.ctx.close()212 def test_SNI_support(self):213 pid = self.start_server(self.args)214 try:215 c = httpslib.HTTPSConnection(self.srv_host, self.srv_port,216 ssl_context=self.ctx)217 c.request('GET', '/')218 c.close()219 finally:220 # (openssl s_server) buffers its log output, and ends the TLS session221 # with the client (allowing the client to terminate) before flushing222 # the log; so, the client may get here and terminate the server223 # before it manages to log the output.224 # So, give the server hopefully enough time to flush the logs.225 time.sleep(sleepTime)226 out, _ = self.stop_server(pid)227 self.assertIn('Hostname in TLS extension: "%s"' % srv_host, out)228 def test_IP_call(self):229 no_exception = True230 runs_counter = 0231 pid = self.start_server(self.args)232 for entry in socket.getaddrinfo(self.srv_host, self.srv_port,233 socket.AF_INET,234 socket.SOCK_STREAM,235 socket.IPPROTO_TCP):236 ipfamily, socktype, _, _, sockaddr = entry237 ip = sockaddr[0]238 sock = socket.socket(ipfamily, socktype)239 conn = SSL.Connection(self.ctx, sock=sock)240 conn.set_tlsext_host_name(self.srv_host)241 conn.set1_host(self.srv_host)242 runs_counter += 1243 try:244 conn.connect((ip, self.srv_port))245 except (SSL.SSLError, socket.error):246 log.exception("Failed to connect to %s:%s", ip, self.srv_port)247 no_exception = False248 finally:249 conn.close()250 out, _ = self.stop_server(pid)251 self.assertEqual(252 out.count('Hostname in TLS extension: "%s"' % self.srv_host),253 runs_counter)254 self.assertTrue(no_exception)255class MiscSSLClientTestCase(BaseSSLClientTestCase):256 def test_no_connection(self):257 ctx = SSL.Context()258 SSL.Connection(ctx)259 def test_server_simple(self):260 pid = self.start_server(self.args)261 try:262 with self.assertRaises(ValueError):263 SSL.Context('tlsv5')264 ctx = SSL.Context()265 s = SSL.Connection(ctx)266 s.connect(self.srv_addr)267 with self.assertRaises(ValueError):268 s.read(0)269 data = self.http_get(s)270 s.close()271 finally:272 self.stop_server(pid)273 self.assertIn('s_server -quiet -www', data)274 def test_server_simple_secure_context(self):275 pid = self.start_server(self.args)276 try:277 ctx = SSL.Context()278 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,279 9)280 ctx.load_verify_locations('tests/ca.pem')281 s = SSL.Connection(ctx)282 s.connect(self.srv_addr)283 data = self.http_get(s)284 s.close()285 finally:286 self.stop_server(pid)287 self.assertIn('s_server -quiet -www', data)288 def test_server_simple_secure_context_fail(self):289 pid = self.start_server(self.args)290 try:291 ctx = SSL.Context()292 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,293 9)294 ctx.load_verify_locations('tests/server.pem')295 s = SSL.Connection(ctx)296 with self.assertRaises(SSL.SSLError):297 s.connect(self.srv_addr)298 s.close()299 finally:300 self.stop_server(pid)301 def test_server_simple_timeouts(self):302 pid = self.start_server(self.args)303 try:304 with self.assertRaises(ValueError):305 SSL.Context('tlsv5')306 ctx = SSL.Context()307 s = SSL.Connection(ctx)308 r = s.get_socket_read_timeout()309 w = s.get_socket_write_timeout()310 self.assertEqual(r.sec, 0, r.sec)311 self.assertEqual(r.microsec, 0, r.microsec)312 self.assertEqual(w.sec, 0, w.sec)313 self.assertEqual(w.microsec, 0, w.microsec)314 s.set_socket_read_timeout(SSL.timeout())315 s.set_socket_write_timeout(SSL.timeout(909, 9))316 r = s.get_socket_read_timeout()317 w = s.get_socket_write_timeout()318 self.assertEqual(r.sec, 600, r.sec)319 self.assertEqual(r.microsec, 0, r.microsec)320 self.assertEqual(w.sec, 909, w.sec)321 # self.assertEqual(w.microsec, 9, w.microsec) XXX 4000322 s.connect(self.srv_addr)323 data = self.http_get(s)324 s.close()325 finally:326 self.stop_server(pid)327 self.assertIn('s_server -quiet -www', data)328 # TLS is required in FIPS mode329 @unittest.skipIf(fips_mode, "Can't be run in FIPS mode")330 def test_tls1_nok(self):331 self.args.append('-no_tls1')332 pid = self.start_server(self.args)333 try:334 with warnings.catch_warnings():335 warnings.simplefilter('ignore', DeprecationWarning)336 ctx = SSL.Context('tlsv1')337 s = SSL.Connection(ctx)338 with six.assertRaisesRegex(self, SSL.SSLError,339 r'version|unexpected eof'):340 s.connect(self.srv_addr)341 s.close()342 finally:343 self.stop_server(pid)344 def test_tls1_ok(self):345 self.args.append('-tls1')346 pid = self.start_server(self.args)347 try:348 with warnings.catch_warnings():349 warnings.simplefilter('ignore', DeprecationWarning)350 ctx = SSL.Context('tlsv1')351 s = SSL.Connection(ctx)352 s.connect(self.srv_addr)353 data = self.http_get(s)354 s.close()355 finally:356 self.stop_server(pid)357 self.assertIn('s_server -quiet -www', data)358 def test_cipher_mismatch(self):359 self.args = self.args + ['-cipher', 'AES256-SHA']360 pid = self.start_server(self.args)361 try:362 ctx = SSL.Context()363 s = SSL.Connection(ctx)364 s.set_cipher_list('AES128-SHA')365 with six.assertRaisesRegex(self, SSL.SSLError,366 'sslv3 alert handshake failure'):367 s.connect(self.srv_addr)368 s.close()369 finally:370 self.stop_server(pid)371 def test_no_such_cipher(self):372 self.args = self.args + ['-cipher', 'AES128-SHA']373 pid = self.start_server(self.args)374 try:375 ctx = SSL.Context()376 s = SSL.Connection(ctx)377 s.set_cipher_list('EXP-RC2-MD5')378 with six.assertRaisesRegex(self, SSL.SSLError,379 'no ciphers available'):380 s.connect(self.srv_addr)381 s.close()382 finally:383 self.stop_server(pid)384 def test_cipher_ok(self):385 self.args = self.args + ['-cipher', 'AES128-SHA']386 pid = self.start_server(self.args)387 try:388 ctx = SSL.Context()389 s = SSL.Connection(ctx)390 s.set_cipher_list('AES128-SHA')391 s.connect(self.srv_addr)392 data = self.http_get(s)393 self.assertEqual(s.get_cipher().name(), 'AES128-SHA',394 s.get_cipher().name())395 cipher_stack = s.get_ciphers()396 self.assertEqual(cipher_stack[0].name(), 'AES128-SHA',397 cipher_stack[0].name())398 with self.assertRaises(IndexError):399 cipher_stack.__getitem__(2)400 # For some reason there are 2 entries in the stack401 # self.assertEqual(len(cipher_stack), 1, len(cipher_stack))402 self.assertEqual(s.get_cipher_list(), 'AES128-SHA',403 s.get_cipher_list())404 # Test Cipher_Stack iterator405 i = 0406 for cipher in cipher_stack:407 i += 1408 self.assertEqual(cipher.name(), 'AES128-SHA',409 '"%s"' % cipher.name())410 self.assertEqual('AES128-SHA-128', str(cipher))411 # For some reason there are 2 entries in the stack412 # self.assertEqual(i, 1, i)413 self.assertEqual(i, len(cipher_stack))414 s.close()415 finally:416 self.stop_server(pid)417 self.assertIn('s_server -quiet -www', data)418 def verify_cb_new(self, ok, store):419 return verify_cb_new_function(ok, store)420 def test_verify_cb_new(self):421 pid = self.start_server(self.args)422 try:423 ctx = SSL.Context()424 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,425 9, self.verify_cb_new)426 s = SSL.Connection(ctx)427 try:428 s.connect(self.srv_addr)429 except SSL.SSLError as e:430 self.fail(e)431 data = self.http_get(s)432 s.close()433 finally:434 self.stop_server(pid)435 self.assertIn('s_server -quiet -www', data)436 def test_verify_cb_new_class(self):437 pid = self.start_server(self.args)438 try:439 ctx = SSL.Context()440 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,441 9, VerifyCB())442 s = SSL.Connection(ctx)443 try:444 s.connect(self.srv_addr)445 except SSL.SSLError as e:446 log.exception(e)447 self.fail(e)448 data = self.http_get(s)449 s.close()450 finally:451 self.stop_server(pid)452 self.assertIn('s_server -quiet -www', data)453 def test_verify_cb_new_function(self):454 pid = self.start_server(self.args)455 try:456 ctx = SSL.Context()457 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,458 9, verify_cb_new_function)459 s = SSL.Connection(ctx)460 try:461 s.connect(self.srv_addr)462 except SSL.SSLError as e:463 self.fail(e)464 data = self.http_get(s)465 s.close()466 finally:467 self.stop_server(pid)468 self.assertIn('s_server -quiet -www', data)469 def test_verify_cb_lambda(self):470 pid = self.start_server(self.args)471 try:472 ctx = SSL.Context()473 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,474 9, lambda ok, store: 1)475 s = SSL.Connection(ctx)476 try:477 s.connect(self.srv_addr)478 except SSL.SSLError as e:479 self.fail(e)480 data = self.http_get(s)481 s.close()482 finally:483 self.stop_server(pid)484 self.assertIn('s_server -quiet -www', data)485 def verify_cb_exception(self, ok, store):486 self.fail('We should fail verification')487 def test_verify_cb_exception(self):488 pid = self.start_server(self.args)489 try:490 ctx = SSL.Context()491 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,492 9, self.verify_cb_exception)493 s = SSL.Connection(ctx)494 with self.assertRaises(SSL.SSLError):495 s.connect(self.srv_addr)496 s.close()497 finally:498 self.stop_server(pid)499 def test_verify_cb_not_callable(self):500 ctx = SSL.Context()501 with self.assertRaises(TypeError):502 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,503 9, 1)504 def test_verify_cb_wrong_callable(self):505 pid = self.start_server(self.args)506 try:507 ctx = SSL.Context()508 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,509 9, lambda _: '')510 s = SSL.Connection(ctx)511 with self.assertRaises(SSL.SSLError):512 with warnings.catch_warnings():513 warnings.simplefilter('ignore', DeprecationWarning)514 s.connect(self.srv_addr)515 s.close()516 finally:517 self.stop_server(pid)518 def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):519 try:520 self.assertFalse(ok)521 self.assertIn(err,522 [m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT,523 m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY,524 m2.X509_V_ERR_CERT_UNTRUSTED,525 m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE])526 self.assertTrue(m2.ssl_ctx_get_cert_store(ctx_ptr))527 self.assertTrue(X509.X509(x509_ptr).as_pem())528 except AssertionError:529 # If we let exceptions propagate from here the530 # caller may see strange errors. This is cleaner.531 return 0532 return 1533 def test_verify_cb_old(self):534 with warnings.catch_warnings():535 warnings.simplefilter("ignore", DeprecationWarning)536 pid = self.start_server(self.args)537 try:538 ctx = SSL.Context()539 ctx.set_verify(540 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,541 9, self.verify_cb_old)542 s = SSL.Connection(ctx)543 try:544 s.connect(self.srv_addr)545 except SSL.SSLError as e:546 self.fail(e)547 data = self.http_get(s)548 s.close()549 finally:550 self.stop_server(pid)551 self.assertIn('s_server -quiet -www', data)552 def test_verify_allow_unknown_old(self):553 pid = self.start_server(self.args)554 try:555 ctx = SSL.Context()556 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,557 9, SSL.cb.ssl_verify_callback_allow_unknown_ca)558 ctx.set_allow_unknown_ca(1)559 s = SSL.Connection(ctx)560 try:561 s.connect(self.srv_addr)562 except SSL.SSLError:563 log.error('Failed to connect to %s', self.srv_addr)564 raise565 data = self.http_get(s)566 s.close()567 finally:568 self.stop_server(pid)569 self.assertIn('s_server -quiet -www', data)570 def test_verify_allow_unknown_new(self):571 pid = self.start_server(self.args)572 try:573 ctx = SSL.Context()574 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,575 9, SSL.cb.ssl_verify_callback_allow_unknown_ca)576 s = SSL.Connection(ctx)577 try:578 s.connect(self.srv_addr)579 except SSL.SSLError as e:580 self.fail(e)581 data = self.http_get(s)582 s.close()583 finally:584 self.stop_server(pid)585 self.assertIn('s_server -quiet -www', data)586 def test_verify_cert(self):587 pid = self.start_server(self.args)588 try:589 ctx = SSL.Context()590 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,591 9)592 ctx.load_verify_locations('tests/ca.pem')593 s = SSL.Connection(ctx)594 try:595 s.connect(self.srv_addr)596 except SSL.SSLError as e:597 self.fail(e)598 data = self.http_get(s)599 s.close()600 finally:601 self.stop_server(pid)602 self.assertIn('s_server -quiet -www', data)603 def test_verify_cert_fail(self):604 pid = self.start_server(self.args)605 try:606 ctx = SSL.Context()607 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,608 9)609 ctx.load_verify_locations('tests/server.pem')610 s = SSL.Connection(ctx)611 with self.assertRaises(SSL.SSLError):612 s.connect(self.srv_addr)613 s.close()614 finally:615 self.stop_server(pid)616 def test_verify_cert_mutual_auth(self):617 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])618 pid = self.start_server(self.args)619 try:620 ctx = SSL.Context()621 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,622 9)623 ctx.load_verify_locations('tests/ca.pem')624 ctx.load_cert('tests/x509.pem')625 s = SSL.Connection(ctx)626 try:627 s.connect(self.srv_addr)628 except SSL.SSLError as e:629 self.fail(e)630 data = self.http_get(s)631 s.close()632 finally:633 self.stop_server(pid)634 self.assertIn('s_server -quiet -www', data)635 def test_verify_cert_mutual_auth_servernbio(self):636 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])637 pid = self.start_server(self.args)638 try:639 ctx = SSL.Context()640 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,641 9)642 ctx.load_verify_locations('tests/ca.pem')643 ctx.load_cert('tests/x509.pem')644 s = SSL.Connection(ctx)645 try:646 s.connect(self.srv_addr)647 except SSL.SSLError as e:648 self.fail(e)649 data = self.http_get(s)650 s.close()651 finally:652 self.stop_server(pid)653 self.assertIn('s_server -quiet -www', data)654 def test_verify_cert_mutual_auth_fail(self):655 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem'])656 pid = self.start_server(self.args)657 try:658 ctx = SSL.Context()659 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,660 9)661 ctx.load_verify_locations('tests/ca.pem')662 s = SSL.Connection(ctx)663 with self.assertRaises(SSL.SSLError):664 s.connect(self.srv_addr)665 s.close()666 finally:667 self.stop_server(pid)668 def test_verify_nocert_fail(self):669 self.args.extend(['-nocert'])670 pid = self.start_server(self.args)671 try:672 ctx = SSL.Context()673 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,674 9)675 ctx.load_verify_locations('tests/ca.pem')676 s = SSL.Connection(ctx)677 with self.assertRaises(SSL.SSLError):678 s.connect(self.srv_addr)679 s.close()680 finally:681 self.stop_server(pid)682 def test_blocking0(self):683 pid = self.start_server(self.args)684 try:685 ctx = SSL.Context()686 s = SSL.Connection(ctx)687 s.setblocking(0)688 with self.assertRaises(Exception):689 s.connect(self.srv_addr)690 s.close()691 finally:692 self.stop_server(pid)693 def test_blocking1(self):694 pid = self.start_server(self.args)695 try:696 ctx = SSL.Context()697 s = SSL.Connection(ctx)698 s.setblocking(1)699 try:700 s.connect(self.srv_addr)701 except SSL.SSLError as e:702 self.fail(e)703 data = self.http_get(s)704 s.close()705 finally:706 self.stop_server(pid)707 self.assertIn('s_server -quiet -www', data)708 def test_makefile(self):709 pid = self.start_server(self.args)710 try:711 ctx = SSL.Context()712 s = SSL.Connection(ctx)713 try:714 s.connect(self.srv_addr)715 except SSL.SSLError as e:716 self.fail(e)717 bio = s.makefile('rwb')718 # s.close() # XXX bug 6628?719 bio.write(b'GET / HTTP/1.0\n\n')720 bio.flush()721 data = bio.read()722 bio.close()723 s.close()724 finally:725 self.stop_server(pid)726 self.assertIn(b's_server -quiet -www', data)727 def test_makefile_err(self):728 pid = self.start_server(self.args)729 try:730 ctx = SSL.Context()731 s = SSL.Connection(ctx)732 try:733 s.connect(self.srv_addr)734 except SSL.SSLError as e:735 self.fail(e)736 f = s.makefile()737 data = self.http_get(s)738 s.close()739 del f740 del s741 err_code = Err.peek_error_code()742 self.assertEqual(err_code, 0,743 'Unexpected error: %s' % err_code)744 err = Err.get_error()745 self.assertIsNone(err, 'Unexpected error: %s' % err)746 finally:747 self.stop_server(pid)748 self.assertIn('s_server -quiet -www', data)749 def test_info_callback(self):750 pid = self.start_server(self.args)751 try:752 ctx = SSL.Context()753 ctx.set_info_callback()754 s = SSL.Connection(ctx)755 s.connect(self.srv_addr)756 data = self.http_get(s)757 s.close()758 finally:759 self.stop_server(pid)760 self.assertIn('s_server -quiet -www', data)761 def test_ssl_connection_free(self):762 pid = self.start_server(self.args)763 orig_m2_ssl_free = SSL.Connection.m2_ssl_free764 def _m2_ssl_free(ssl):765 orig_m2_ssl_free(ssl)766 _m2_ssl_free.called = True767 try:768 ctx = SSL.Context()769 s = SSL.Connection(ctx)770 s.m2_ssl_free = _m2_ssl_free771 s.connect(self.srv_addr)772 data = self.http_get(s)773 s.close()774 self.assertFalse(hasattr(_m2_ssl_free, 'called'))775 # keep fingers crossed that SSL.Connection.__del__ is called776 # by the python interpreter777 del s778 finally:779 self.stop_server(pid)780 self.assertIn('s_server -quiet -www', data)781 self.assertTrue(getattr(_m2_ssl_free, 'called', False))782 def test_ssl_connection_no_free(self):783 pid = self.start_server(self.args)784 orig_m2_ssl_free = SSL.Connection.m2_ssl_free785 def _m2_ssl_free(ssl):786 _m2_ssl_free.called = True787 orig_m2_ssl_free(ssl)788 try:789 ctx = SSL.Context()790 s = SSL.Connection(ctx)791 s.m2_ssl_free = _m2_ssl_free792 s.set_ssl_close_flag(m2.bio_close)793 s.connect(self.srv_addr)794 data = self.http_get(s)795 s.close()796 self.assertFalse(hasattr(_m2_ssl_free, 'called'))797 # keep fingers crossed that SSL.Connection.__del__ is called798 # by the python interpreter799 del s800 finally:801 self.stop_server(pid)802 self.assertIn('s_server -quiet -www', data)803 self.assertFalse(hasattr(_m2_ssl_free, 'called'))804class UrllibSSLClientTestCase(BaseSSLClientTestCase):805 @unittest.skipIf(six.PY3, "urllib.URLOpener is deprecated in py3k")806 def test_urllib(self):807 pid = self.start_server(self.args)808 try:809 url = m2urllib.FancyURLopener()810 url.addheader('Connection', 'close')811 u = url.open('https://%s:%s/' % (srv_host, self.srv_port))812 data = u.read()813 u.close()814 finally:815 self.stop_server(pid)816 self.assertIn('s_server -quiet -www', data)817 # XXX Don't actually know how to use m2urllib safely!818 # def test_urllib_safe_context(self):819 # def test_urllib_safe_context_fail(self):820class Urllib2SSLClientTestCase(BaseSSLClientTestCase):821 def test_urllib2(self):822 pid = self.start_server(self.args)823 try:824 opener = m2urllib2.build_opener()825 opener.addheaders = [('Connection', 'close')]826 u = opener.open('https://%s:%s/' % (srv_host, self.srv_port))827 data = u.read()828 u.close()829 finally:830 self.stop_server(pid)831 self.assertIn(b's_server -quiet -www', data)832 def test_urllib2_secure_context(self):833 pid = self.start_server(self.args)834 try:835 ctx = SSL.Context()836 ctx.set_verify(837 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)838 ctx.load_verify_locations('tests/ca.pem')839 opener = m2urllib2.build_opener(ctx)840 opener.addheaders = [('Connection', 'close')]841 u = opener.open('https://%s:%s/' % (srv_host, self.srv_port))842 data = u.read()843 u.close()844 finally:845 self.stop_server(pid)846 self.assertIn(b's_server -quiet -www', data)847 def test_urllib2_secure_context_fail(self):848 pid = self.start_server(self.args)849 try:850 ctx = SSL.Context()851 ctx.set_verify(852 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)853 ctx.load_verify_locations('tests/server.pem')854 opener = m2urllib2.build_opener(ctx)855 opener.addheaders = [('Connection', 'close')]856 with self.assertRaises(SSL.SSLError):857 opener.open('https://%s:%s/' % (srv_host, self.srv_port))858 finally:859 self.stop_server(pid)860 def test_z_urllib2_opener(self):861 pid = self.start_server(self.args)862 try:863 ctx = SSL.Context()864 opener = m2urllib2.build_opener(865 ctx, m2urllib2.HTTPBasicAuthHandler())866 m2urllib2.install_opener(opener)867 req = m2urllib2.Request('https://%s:%s/' %868 (srv_host, self.srv_port))869 u = m2urllib2.urlopen(req)870 data = u.read()871 u.close()872 finally:873 self.stop_server(pid)874 self.assertIn(b's_server -quiet -www', data)875 def test_urllib2_opener_handlers(self):876 ctx = SSL.Context()877 m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())878 def test_urllib2_leak(self):879 pid = self.start_server(self.args)880 try:881 o = m2urllib2.build_opener()882 r = o.open('https://%s:%s/' % (srv_host, self.srv_port))883 s = [r.fp._sock.fp]884 r.close()885 # TODO This should be assertEqual 1, but we leak sock886 # somehwere. Not sure how to fix it.887 log.debug('get_referrers = %d', len(gc.get_referrers(s[0])))888 self.assertLessEqual(len(gc.get_referrers(s[0])), 2)889 finally:890 self.stop_server(pid)891class Urllib2TEChunkedSSLClientTestCase(BaseSSLClientTestCase):892 """Test a response with "Transfer-Encoding: chunked"."""893 def setUp(self):894 super(Urllib2TEChunkedSSLClientTestCase, self).setUp()895 self.args = ['s_server', '-quiet', '-HTTP',896 '-accept', str(self.srv_port)]897 def test_transfer_encoding_chunked(self):898 pid = self.start_server(self.args)899 try:900 url = 'https://%s:%s/te_chunked_response.txt' % (srv_host,901 self.srv_port)902 o = m2urllib2.build_opener()903 u = o.open(url)904 data = u.read()905 self.assertEqual(b'foo\nfoobar\n', data)906 finally:907 self.stop_server(pid)908@unittest.skipUnless(py27plus,909 "Twisted doesn't test well with Python 2.6")910class TwistedSSLClientTestCase(BaseSSLClientTestCase):911 def test_timeout(self):912 pid = self.start_server(self.args)913 try:914 ctx = SSL.Context()915 s = SSL.Connection(ctx)916 # Just a really small number so we can timeout917 s.settimeout(0.000000000000000000000000000001)918 with self.assertRaises(SSL.SSLTimeoutError):919 s.connect(self.srv_addr)920 s.close()921 finally:922 self.stop_server(pid)923 def test_makefile_timeout(self):924 # httpslib uses makefile to read the response925 pid = self.start_server(self.args)926 try:927 c = httpslib.HTTPSConnection(srv_host, self.srv_port)928 c.putrequest('GET', '/')929 c.putheader('Accept', 'text/html')930 c.putheader('Accept', 'text/plain')931 c.endheaders()932 c.sock.settimeout(100)933 resp = c.getresponse()934 self.assertEqual(resp.status, 200, resp.reason)935 data = resp.read()936 c.close()937 finally:938 self.stop_server(pid)939 self.assertIn(b's_server -quiet -www', data)940 def test_makefile_timeout_fires(self):941 # This is convoluted because (openssl s_server -www) starts942 # writing the response as soon as it receives the first line of943 # the request, so it's possible for it to send the response944 # before the request is sent and there would be no timeout. So,945 # let the server spend time reading from an empty pipe946 FIFO_NAME = 'test_makefile_timeout_fires_fifo' # noqa947 os.mkfifo('tests/' + FIFO_NAME)948 pipe_pid = os.fork()949 try:950 if pipe_pid == 0:951 try:952 with open('tests/' + FIFO_NAME, 'w') as f:953 time.sleep(sleepTime + 1)954 f.write('Content\n')955 finally:956 os._exit(0)957 self.args[self.args.index('-www')] = '-WWW'958 pid = self.start_server(self.args)959 try:960 c = httpslib.HTTPSConnection(srv_host, self.srv_port)961 c.putrequest('GET', '/' + FIFO_NAME)962 c.putheader('Accept', 'text/html')963 c.putheader('Accept', 'text/plain')964 c.endheaders()965 c.sock.settimeout(0.0000000001)966 with self.assertRaises(socket.timeout):967 c.getresponse()968 c.close()969 finally:970 self.stop_server(pid)971 finally:972 os.kill(pipe_pid, signal.SIGTERM)973 os.waitpid(pipe_pid, 0)974 os.unlink('tests/' + FIFO_NAME)975 def test_twisted_wrapper(self):976 # Test only when twisted and ZopeInterfaces are present977 try:978 from twisted.internet.protocol import ClientFactory979 from twisted.protocols.basic import LineReceiver980 from twisted.internet import reactor981 import M2Crypto.SSL.TwistedProtocolWrapper as wrapper982 except ImportError:983 warnings.warn(984 'Skipping twisted wrapper test because twisted not found')985 return986 # TODO Class must implement all abstract methods987 class EchoClient(LineReceiver):988 def connectionMade(self):989 self.sendLine(b'GET / HTTP/1.0\n\n')990 def lineReceived(self, line):991 global twisted_data992 twisted_data += line993 class EchoClientFactory(ClientFactory):994 protocol = EchoClient995 def clientConnectionFailed(self, connector, reason):996 reactor.stop()997 self.fail(reason)998 def clientConnectionLost(self, connector, reason):999 reactor.stop()1000 pid = self.start_server(self.args)1001 class ContextFactory(object):1002 def getContext(self):1003 return SSL.Context()1004 try:1005 global twisted_data1006 twisted_data = b''1007 context_factory = ContextFactory()1008 factory = EchoClientFactory()1009 wrapper.connectSSL(srv_host, self.srv_port, factory,1010 context_factory)1011 # This will block until reactor.stop() is called1012 reactor.run()1013 finally:1014 self.stop_server(pid)1015 self.assertIn(b's_server -quiet -www', twisted_data)1016twisted_data = ''1017class XmlRpcLibTestCase(unittest.TestCase):1018 def test_lib(self):1019 m2xmlrpclib.SSL_Transport()1020 # XXX need server to test against1021class FtpsLibTestCase(unittest.TestCase):1022 def test_lib(self):1023 ftpslib.FTP_TLS()1024 # XXX need server to test against1025class SessionTestCase(unittest.TestCase):1026 def test_session_load_bad(self):1027 with self.assertRaises(SSL.SSLError):1028 SSL.Session.load_session('tests/signer.pem')...
test_client_server.py
Source:test_client_server.py
...48 self.loop.run_until_complete(49 asyncio.wait_for(self.client.worker_task, timeout=1))50 except asyncio.TimeoutError: # pragma: no cover51 self.fail("Client failed to stop")52 def stop_server(self):53 self.server.close()54 try:55 self.loop.run_until_complete(56 asyncio.wait_for(self.server.wait_closed(), timeout=1))57 except asyncio.TimeoutError: # pragma: no cover58 self.fail("Server failed to stop")59 def test_basic(self):60 self.start_server()61 self.start_client()62 self.loop.run_until_complete(self.client.send("Hello!"))63 reply = self.loop.run_until_complete(self.client.recv())64 self.assertEqual(reply, "Hello!")65 self.stop_client()66 self.stop_server()67 def test_server_close_while_client_connected(self):68 self.start_server()69 self.start_client()70 self.stop_server()71 def test_explicit_event_loop(self):72 self.start_server(loop=self.loop)73 self.start_client(loop=self.loop)74 self.loop.run_until_complete(self.client.send("Hello!"))75 reply = self.loop.run_until_complete(self.client.recv())76 self.assertEqual(reply, "Hello!")77 self.stop_client()78 self.stop_server()79 def test_protocol_attributes(self):80 self.start_server()81 self.start_client('attributes')82 expected_attrs = ('localhost', 8642, self.secure)83 client_attrs = (self.client.host, self.client.port, self.client.secure)84 self.assertEqual(client_attrs, expected_attrs)85 server_attrs = self.loop.run_until_complete(self.client.recv())86 self.assertEqual(server_attrs, repr(expected_attrs))87 self.stop_client()88 self.stop_server()89 def test_protocol_headers(self):90 self.start_server()91 self.start_client('headers')92 client_req = self.client.request_headers93 client_resp = self.client.response_headers94 self.assertEqual(client_req['User-Agent'], USER_AGENT)95 self.assertEqual(client_resp['Server'], USER_AGENT)96 server_req = self.loop.run_until_complete(self.client.recv())97 server_resp = self.loop.run_until_complete(self.client.recv())98 self.assertEqual(server_req, str(client_req))99 self.assertEqual(server_resp, str(client_resp))100 self.stop_client()101 self.stop_server()102 def test_protocol_raw_headers(self):103 self.start_server()104 self.start_client('raw_headers')105 client_req = self.client.raw_request_headers106 client_resp = self.client.raw_response_headers107 self.assertEqual(dict(client_req)['User-Agent'], USER_AGENT)108 self.assertEqual(dict(client_resp)['Server'], USER_AGENT)109 server_req = self.loop.run_until_complete(self.client.recv())110 server_resp = self.loop.run_until_complete(self.client.recv())111 self.assertEqual(server_req, repr(client_req))112 self.assertEqual(server_resp, repr(client_resp))113 self.stop_client()114 self.stop_server()115 def test_protocol_custom_request_headers_dict(self):116 self.start_server()117 self.start_client('raw_headers', extra_headers={'X-Spam': 'Eggs'})118 req_headers = self.loop.run_until_complete(self.client.recv())119 self.loop.run_until_complete(self.client.recv())120 self.assertIn("('X-Spam', 'Eggs')", req_headers)121 self.stop_client()122 self.stop_server()123 def test_protocol_custom_request_headers_list(self):124 self.start_server()125 self.start_client('raw_headers', extra_headers=[('X-Spam', 'Eggs')])126 req_headers = self.loop.run_until_complete(self.client.recv())127 self.loop.run_until_complete(self.client.recv())128 self.assertIn("('X-Spam', 'Eggs')", req_headers)129 self.stop_client()130 self.stop_server()131 def test_protocol_custom_response_headers_callable_dict(self):132 self.start_server(extra_headers=lambda p, r: {'X-Spam': 'Eggs'})133 self.start_client('raw_headers')134 self.loop.run_until_complete(self.client.recv())135 resp_headers = self.loop.run_until_complete(self.client.recv())136 self.assertIn("('X-Spam', 'Eggs')", resp_headers)137 self.stop_client()138 self.stop_server()139 def test_protocol_custom_response_headers_callable_list(self):140 self.start_server(extra_headers=lambda p, r: [('X-Spam', 'Eggs')])141 self.start_client('raw_headers')142 self.loop.run_until_complete(self.client.recv())143 resp_headers = self.loop.run_until_complete(self.client.recv())144 self.assertIn("('X-Spam', 'Eggs')", resp_headers)145 self.stop_client()146 self.stop_server()147 def test_protocol_custom_response_headers_dict(self):148 self.start_server(extra_headers={'X-Spam': 'Eggs'})149 self.start_client('raw_headers')150 self.loop.run_until_complete(self.client.recv())151 resp_headers = self.loop.run_until_complete(self.client.recv())152 self.assertIn("('X-Spam', 'Eggs')", resp_headers)153 self.stop_client()154 self.stop_server()155 def test_protocol_custom_response_headers_list(self):156 self.start_server(extra_headers=[('X-Spam', 'Eggs')])157 self.start_client('raw_headers')158 self.loop.run_until_complete(self.client.recv())159 resp_headers = self.loop.run_until_complete(self.client.recv())160 self.assertIn("('X-Spam', 'Eggs')", resp_headers)161 self.stop_client()162 self.stop_server()163 def test_no_subprotocol(self):164 self.start_server()165 self.start_client('subprotocol')166 server_subprotocol = self.loop.run_until_complete(self.client.recv())167 self.assertEqual(server_subprotocol, repr(None))168 self.assertEqual(self.client.subprotocol, None)169 self.stop_client()170 self.stop_server()171 def test_subprotocol_found(self):172 self.start_server(subprotocols=['superchat', 'chat'])173 self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])174 server_subprotocol = self.loop.run_until_complete(self.client.recv())175 self.assertEqual(server_subprotocol, repr('chat'))176 self.assertEqual(self.client.subprotocol, 'chat')177 self.stop_client()178 self.stop_server()179 def test_subprotocol_not_found(self):180 self.start_server(subprotocols=['superchat'])181 self.start_client('subprotocol', subprotocols=['otherchat'])182 server_subprotocol = self.loop.run_until_complete(self.client.recv())183 self.assertEqual(server_subprotocol, repr(None))184 self.assertEqual(self.client.subprotocol, None)185 self.stop_client()186 self.stop_server()187 def test_subprotocol_not_offered(self):188 self.start_server()189 self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])190 server_subprotocol = self.loop.run_until_complete(self.client.recv())191 self.assertEqual(server_subprotocol, repr(None))192 self.assertEqual(self.client.subprotocol, None)193 self.stop_client()194 self.stop_server()195 def test_subprotocol_not_requested(self):196 self.start_server(subprotocols=['superchat', 'chat'])197 self.start_client('subprotocol')198 server_subprotocol = self.loop.run_until_complete(self.client.recv())199 self.assertEqual(server_subprotocol, repr(None))200 self.assertEqual(self.client.subprotocol, None)201 self.stop_client()202 self.stop_server()203 @unittest.mock.patch.object(WebSocketServerProtocol, 'select_subprotocol')204 def test_subprotocol_error(self, _select_subprotocol):205 _select_subprotocol.return_value = 'superchat'206 self.start_server(subprotocols=['superchat'])207 with self.assertRaises(InvalidHandshake):208 self.start_client('subprotocol', subprotocols=['otherchat'])209 self.run_loop_once()210 self.stop_server()211 @unittest.mock.patch('websockets.server.read_request')212 def test_server_receives_malformed_request(self, _read_request):213 _read_request.side_effect = ValueError("read_request failed")214 self.start_server()215 with self.assertRaises(InvalidHandshake):216 self.start_client()217 self.stop_server()218 @unittest.mock.patch('websockets.client.read_response')219 def test_client_receives_malformed_response(self, _read_response):220 _read_response.side_effect = ValueError("read_response failed")221 self.start_server()222 with self.assertRaises(InvalidHandshake):223 self.start_client()224 self.run_loop_once()225 self.stop_server()226 @unittest.mock.patch('websockets.client.build_request')227 def test_client_sends_invalid_handshake_request(self, _build_request):228 def wrong_build_request(set_header):229 return '42'230 _build_request.side_effect = wrong_build_request231 self.start_server()232 with self.assertRaises(InvalidHandshake):233 self.start_client()234 self.stop_server()235 @unittest.mock.patch('websockets.server.build_response')236 def test_server_sends_invalid_handshake_response(self, _build_response):237 def wrong_build_response(set_header, key):238 return build_response(set_header, '42')239 _build_response.side_effect = wrong_build_response240 self.start_server()241 with self.assertRaises(InvalidHandshake):242 self.start_client()243 self.stop_server()244 @unittest.mock.patch('websockets.client.read_response')245 def test_server_does_not_switch_protocols(self, _read_response):246 @asyncio.coroutine247 def wrong_read_response(stream):248 code, headers = yield from read_response(stream)249 return 400, headers250 _read_response.side_effect = wrong_read_response251 self.start_server()252 with self.assertRaises(InvalidHandshake):253 self.start_client()254 self.run_loop_once()255 self.stop_server()256 @unittest.mock.patch('websockets.server.WebSocketServerProtocol.send')257 def test_server_handler_crashes(self, send):258 send.side_effect = ValueError("send failed")259 self.start_server()260 self.start_client()261 self.loop.run_until_complete(self.client.send("Hello!"))262 with self.assertRaises(ConnectionClosed):263 self.loop.run_until_complete(self.client.recv())264 self.stop_client()265 self.stop_server()266 # Connection ends with an unexpected error.267 self.assertEqual(self.client.close_code, 1011)268 @unittest.mock.patch('websockets.server.WebSocketServerProtocol.close')269 def test_server_close_crashes(self, close):270 close.side_effect = ValueError("close failed")271 self.start_server()272 self.start_client()273 self.loop.run_until_complete(self.client.send("Hello!"))274 reply = self.loop.run_until_complete(self.client.recv())275 self.assertEqual(reply, "Hello!")276 self.stop_client()277 self.stop_server()278 # Connection ends with an abnormal closure.279 self.assertEqual(self.client.close_code, 1006)280 @unittest.mock.patch.object(WebSocketClientProtocol, 'handshake')281 def test_client_closes_connection_before_handshake(self, handshake):282 self.start_server()283 self.start_client()284 # We have mocked the handshake() method to prevent the client from285 # performing the opening handshake. Force it to close the connection.286 self.loop.run_until_complete(self.client.close_connection(force=True))287 self.stop_client()288 # The server should stop properly anyway. It used to hang because the289 # worker handling the connection was waiting for the opening handshake.290 self.stop_server()291 @unittest.mock.patch('websockets.server.read_request')292 def test_server_shuts_down_during_opening_handshake(self, _read_request):293 _read_request.side_effect = asyncio.CancelledError294 self.start_server()295 self.server.closing = True296 with self.assertRaises(InvalidHandshake) as raised:297 self.start_client()298 self.stop_server()299 # Opening handshake fails with 503 Service Unavailable300 self.assertEqual(str(raised.exception), "Bad status code: 503")301 def test_server_shuts_down_during_connection_handling(self):302 self.start_server()303 self.start_client()304 self.server.close()305 with self.assertRaises(ConnectionClosed):306 self.loop.run_until_complete(self.client.recv())307 self.stop_client()308 self.stop_server()309 # Websocket connection terminates with 1001 Going Away.310 self.assertEqual(self.client.close_code, 1001)311@unittest.skipUnless(os.path.exists(testcert), "test certificate is missing")312class SSLClientServerTests(ClientServerTests):313 secure = True314 @property315 def server_context(self):316 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)317 ssl_context.load_cert_chain(testcert)318 return ssl_context319 @property320 def client_context(self):321 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)322 ssl_context.load_verify_locations(testcert)323 ssl_context.verify_mode = ssl.CERT_REQUIRED324 return ssl_context325 def start_server(self, *args, **kwds):326 kwds['ssl'] = self.server_context327 server = serve(handler, 'localhost', 8642, **kwds)328 self.server = self.loop.run_until_complete(server)329 def start_client(self, path='', **kwds):330 kwds['ssl'] = self.client_context331 client = connect('wss://localhost:8642/' + path, **kwds)332 self.client = self.loop.run_until_complete(client)333 def test_ws_uri_is_rejected(self):334 self.start_server()335 client = connect('ws://localhost:8642/', ssl=self.client_context)336 with self.assertRaises(ValueError):337 self.loop.run_until_complete(client)338 self.stop_server()339class ClientServerOriginTests(unittest.TestCase):340 def setUp(self):341 self.loop = asyncio.new_event_loop()342 asyncio.set_event_loop(self.loop)343 def tearDown(self):344 self.loop.close()345 def test_checking_origin_succeeds(self):346 server = self.loop.run_until_complete(347 serve(handler, 'localhost', 8642, origins=['http://localhost']))348 client = self.loop.run_until_complete(349 connect('ws://localhost:8642/', origin='http://localhost'))350 self.loop.run_until_complete(client.send("Hello!"))351 self.assertEqual(self.loop.run_until_complete(client.recv()), "Hello!")352 self.loop.run_until_complete(client.close())...
service_test_case.py
Source:service_test_case.py
1import contextlib2import io3import logging4import unittest5import socket6from threading import Thread, Event7from wsgiref.headers import Headers8from wsgiref.simple_server import make_server9from wsgiref.util import request_uri10from urllib.parse import urlparse11from typing import Any, Callable, cast, Dict, Generator, Iterable, List12from typing import Optional, Tuple, TYPE_CHECKING13if TYPE_CHECKING:14 from wsgiref.types import StartResponse15 Environ = Dict[str, Any]16 Handler = Callable[17 [18 Environ,19 StartResponse20 ],21 Iterable[bytes]22 ]23 HandlerIdentifier = Tuple[str, str]24def get_port() -> int:25 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)26 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)27 s.bind(("localhost", 0))28 port = cast(int, s.getsockname()[1])29 s.close()30 return port31class ServiceRequest(object):32 # eventually this can contain headers, body, etc. as necessary for comparison33 def __init__(34 self, headers: Dict[str, str], method: str, path: str, body: Optional[bytes]) -> None:35 self.headers = Headers([(key.replace("_", "-"), value) for key, value in headers.items()])36 self.method = method37 self.path = path38 self.body = body39 def assert_header_equals(self, header_key: str, header_value: str) -> None:40 assert header_key in self.headers, \41 f"Expected header {header_key} not in request headers."42 actual_value = self.headers[header_key]43 assert actual_value == header_value, \44 f"Request header {header_key} unexpectedly set to " \45 f"`{actual_value}` instead of `{header_value}`."46 def assert_body_equals(self, body: bytes) -> None:47 assert body == self.body, f"Body unexpectedly equals {self.body} instead of {body}"48class Service(object):49 fetches: Dict[Tuple[str, str], List[ServiceRequest]]50 server_started: Optional[Event]51 stop_server: Optional[Event]52 thread: Optional[Thread]53 def __init__(self) -> None:54 self.port: int = get_port()55 self.handlers: "Dict[HandlerIdentifier, Handler]" = {}56 self.thread = None57 self.fetches = {}58 self.server_started = None59 self.stop_server = None60 def url(self, path: str) -> str:61 return f"http://localhost:{self.port}{path}"62 def add_handler(self, method: str, path: str, callback: "Handler") -> None:63 identifier: "HandlerIdentifier" = (method, path)64 self.handlers[identifier] = callback65 def handler(self, environ: "Environ", start_response: "StartResponse") -> Iterable[bytes]:66 uri = request_uri(environ, include_query=True)67 path = urlparse(uri).path68 method = environ["REQUEST_METHOD"]69 body: Optional[bytes] = None70 if method in ("POST", "PUT"):71 content_length = int(environ.get("CONTENT_LENGTH", 0) or "0")72 if content_length > 0:73 body = environ["wsgi.input"].read(content_length)74 if body is None:75 body = b""76 environ["wsgi.input"] = io.BytesIO(body)77 else:78 logging.warning(f"Unable to determine content length for request {method} {path}")79 headers = {80 key: value for key, value in environ.copy().items()81 if key.startswith("HTTP_") or key == "CONTENT_TYPE"82 }83 request = ServiceRequest(headers=headers, method=method, path=path, body=body)84 logging.info(f"Received {method} request for localhost:{self.port}{path}.")85 identifier = (method, path)86 if identifier not in self.handlers:87 logging.warning(88 f"No handler registered for {method} "89 f"localhost:{self.port}{path}")90 start_response("404 Not Found", [("Content-type", "text/plain")])91 return [f"No handler registered for {identifier}".encode("utf8")]92 environ["REQUEST_PATH"] = path93 self.fetches.setdefault(identifier, [])94 self.fetches[identifier].append(request)95 return self.handlers[identifier](environ, start_response)96 def start(self) -> None:97 if self.server_started is not None or self.stop_server is not None:98 raise Exception(f"Service already started on port {self.port}")99 self.server_started = Event()100 self.stop_server = Event()101 # work around mypy failing to infer that these variables can't be None102 server_started = self.server_started103 stop_server = self.stop_server104 self.thread = Thread(target=lambda: self.loop(server_started, stop_server))105 self.thread.start()106 logging.info(f"Starting server on port {self.port}...")107 server_started.wait()108 logging.info(f"Server on port {self.port} ready for requests.")109 def stop(self) -> None:110 if self.server_started is not None and self.stop_server is not None \111 and self.thread is not None:112 self.stop_server.set()113 self.thread.join()114 self.server_started = None115 self.stop_server = None116 self.thread = None117 def loop(self, server_started: Event, stop_server: Event) -> None:118 with make_server("localhost", self.port, self.handler) as httpd:119 httpd.timeout = 0.01120 server_started.set()121 while not stop_server.is_set():122 httpd.handle_request()123 def assert_requested(124 self, method: str, path: str,125 headers: Optional[Dict[str, str]] = None) -> ServiceRequest:126 identifier = (method, path)127 assert identifier in self.fetches, f"Could not find request matching {method} {path}"128 request = self.fetches[identifier][0]129 if headers is not None:130 for expected_header, expected_value in headers.items():131 request.assert_header_equals(expected_header, expected_value)132 return request133 def get_all_requests(self, method: str, path: str) -> List[ServiceRequest]:134 identifier = (method, path)135 return self.fetches.get(identifier, [])136 def assert_not_requested(self, method: str, path: str) -> None:137 identifier = (method, path)138 assert identifier not in self.fetches, f"Unexpected request found for {method} {path}"139 def assert_requested_n_times(140 self, method: str, path: str, n: int) -> List[ServiceRequest]:141 requests = self.get_all_requests(method, path)142 assert len(requests) == n, \143 f"Expected request count for {method} {path} ({n}) did not match " \144 f"actual count: {len(requests)}"145 return requests146class ServiceTestCase(unittest.TestCase):147 def setUp(self) -> None:148 super().setUp()149 self.services: List[Service] = []150 def tearDown(self) -> None:151 super().tearDown()152 self.stop_services()153 def add_service(self) -> Service:154 service = Service()155 self.services.append(service)156 return service157 def start_services(self) -> None:158 for service in self.services:159 service.start()160 def stop_services(self) -> None:161 for service in self.services:162 service.stop()163 @contextlib.contextmanager164 def run_services(self) -> Generator[None, None, None]:165 self.start_services()166 try:167 yield168 finally:...
run_tests.py
Source:run_tests.py
...16 print("RESPONSE: {}".format(response))17 if response == '{"id": "8db4206f-8878-174d-7a23-dd2c4f4ef5a0", "prediction": 0.1495}':18 print("\nPredict endpoint: PASS")19 print("###")20 stop_server(server)21 exit(0)22 else:23 print("\nPredict endpoint: FAIL")24 print("###") 25 stop_server(server)26 exit(1)27 except:28 stop_server(server)29def test_train_model():30 print("\n###")31 print("Testing train model")32 server = start_server()33 try:34 config = load_config(config_file)35 n_models = len(os.listdir(config["model_dir"]))36 data = {"dataset_path": "training_set.parquet"}37 requests.post("http://localhost:8080/train", json=data)38 if n_models + 1 == len(os.listdir(config["model_dir"])):39 print("\nTrain endpoint: PASS")40 print("###")41 stop_server(server)42 exit(0)43 else:44 print("\nTrain endpoint: FAIL")45 print("###") 46 stop_server(server)47 exit(1)48 except:49 stop_server(server)50def test_update_model():51 print("\n###")52 print("Testing update model")53 server = start_server()54 old_model = load_config(config_file)["current_model"]55 # Using the same file56 data = {"dataset_path": "training_set.parquet"}57 response = requests.post("http://localhost:8080/train", json=data)58 # For some reason the response is coming with single quotes and that is not a proper JSON59 # So I need this ugly implementation to work around this...60 model_name = json.loads(response.text.replace("'", "\""))["model_name"]61 data = {"model_name": "{}".format(model_name)}62 response = requests.post("http://localhost:8080/update_model", json=data)63 model_name = json.loads(response.text.replace("'", "\""))["current_model"]64 if old_model != model_name:65 print("\nUpdate model endpoint: PASS")66 print("###")67 stop_server(server)68 exit(0)69 else:70 print("\nUpdate model endpoint: FAIL")71 print("###") 72 stop_server(server)73 exit(1)74 try:75 pass76 except:77 stop_server(server)78def test_config():79 print("\n###")80 print("Testing config")81 server = start_server()82 try:83 config = load_config(config_file)84 response = requests.get("http://localhost:8080/config")85 if json.dumps(config) == response.text:86 print("\nConfig endpoint: PASS")87 print("###")88 stop_server(server)89 exit(0)90 else:91 print("\nConfig endpoint: FAIL")92 print("###") 93 stop_server(server)94 exit(1)95 except:96 stop_server(server)97def start_server():98 server = sp.Popen(['python', 'server_run.py'])99 # Giving a few seconds to have the server start100 time.sleep(3)101 return server102def stop_server(server_process):103 sp.Popen.terminate(server_process)104def load_config(config_path):105 # Loading config file from disk106 with open(config_path, "r") as json_data:107 config = json.loads(json_data.read())108 return config109if __name__ == "__main__":110 config_file = "config.json"111 # Tests to run112 test_config()113 test_predict()114 test_train_model()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!