Best Python code snippet using playwright-python
test_ssl.py
Source:test_ssl.py
...58 pass59 return False60class BaseSSLClientTestCase(unittest.TestCase):61 openssl_in_path = find_openssl()62 def start_server(self, args):63 if not self.openssl_in_path:64 raise Exception('openssl command not in PATH')65 66 pid = os.fork()67 if pid == 0:68 # openssl must be started in the tests directory for it69 # to find the .pem files70 os.chdir('tests')71 try:72 os.execvp('openssl', args)73 finally:74 os.chdir('..')75 76 else:77 time.sleep(sleepTime)78 return pid79 def stop_server(self, pid):80 os.kill(pid, 1)81 os.waitpid(pid, 0)82 def http_get(self, s):83 s.send('GET / HTTP/1.0\n\n') 84 resp = ''85 while 1:86 try:87 r = s.recv(4096)88 if not r:89 break90 except SSL.SSLError: # s_server throws an 'unexpected eof'...91 break92 resp = resp + r 93 return resp94 def setUp(self):95 self.srv_host = srv_host96 self.srv_port = srv_port97 self.srv_addr = (srv_host, srv_port)98 self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)99 self.args = ['s_server', '-quiet', '-www',100 #'-cert', 'server.pem', Implicitly using this101 '-accept', str(self.srv_port)]102 def tearDown(self):103 global srv_port104 srv_port = srv_port - 1105class PassSSLClientTestCase(BaseSSLClientTestCase):106 107 def test_pass(self):108 pass109class HttpslibSSLClientTestCase(BaseSSLClientTestCase):110 def test_HTTPSConnection(self):111 pid = self.start_server(self.args)112 try:113 from M2Crypto import httpslib114 c = httpslib.HTTPSConnection(srv_host, srv_port)115 c.request('GET', '/')116 data = c.getresponse().read()117 c.close()118 finally:119 self.stop_server(pid)120 self.failIf(string.find(data, 's_server -quiet -www') == -1)121 def test_HTTPSConnection_resume_session(self):122 pid = self.start_server(self.args)123 try:124 from M2Crypto import httpslib125 ctx = SSL.Context()126 ctx.load_verify_locations(cafile='tests/ca.pem')127 ctx.load_cert('tests/x509.pem')128 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)129 ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)130 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)131 c.request('GET', '/')132 ses = c.get_session()133 t = ses.as_text()134 data = c.getresponse().read()135 # Appearently closing connection here screws session; Ali Polatel?136 # c.close()137 138 ctx2 = SSL.Context()139 ctx2.load_verify_locations(cafile='tests/ca.pem')140 ctx2.load_cert('tests/x509.pem')141 ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)142 ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)143 c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)144 c2.set_session(ses)145 c2.request('GET', '/')146 ses2 = c2.get_session()147 t2 = ses2.as_text()148 data = c2.getresponse().read()149 c.close()150 c2.close()151 assert t == t2, "Sessions did not match"152 finally:153 self.stop_server(pid)154 self.failIf(string.find(data, 's_server -quiet -www') == -1)155 def test_HTTPSConnection_secure_context(self):156 pid = self.start_server(self.args)157 try:158 from M2Crypto import httpslib159 ctx = SSL.Context()160 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)161 ctx.load_verify_locations('tests/ca.pem')162 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)163 c.request('GET', '/')164 data = c.getresponse().read()165 c.close()166 finally:167 self.stop_server(pid)168 self.failIf(string.find(data, 's_server -quiet -www') == -1)169 def test_HTTPSConnection_secure_context_fail(self):170 pid = self.start_server(self.args)171 try:172 from M2Crypto import httpslib173 ctx = SSL.Context()174 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)175 ctx.load_verify_locations('tests/server.pem')176 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)177 self.assertRaises(SSL.SSLError, c.request, 'GET', '/')178 c.close()179 finally:180 self.stop_server(pid)181 def test_HTTPS(self):182 pid = self.start_server(self.args)183 try:184 from M2Crypto import httpslib185 c = httpslib.HTTPS(srv_host, srv_port)186 c.putrequest('GET', '/')187 c.putheader('Accept', 'text/html')188 c.putheader('Accept', 'text/plain')189 c.endheaders()190 err, msg, headers = c.getreply()191 assert err == 200, err192 f = c.getfile()193 data = f.read()194 c.close()195 finally:196 self.stop_server(pid)197 self.failIf(string.find(data, 's_server -quiet -www') == -1)198 def test_HTTPS_secure_context(self):199 pid = self.start_server(self.args)200 try:201 from M2Crypto import httpslib202 ctx = SSL.Context()203 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)204 ctx.load_verify_locations('tests/ca.pem')205 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)206 c.putrequest('GET', '/')207 c.putheader('Accept', 'text/html')208 c.putheader('Accept', 'text/plain')209 c.endheaders()210 err, msg, headers = c.getreply()211 assert err == 200, err212 f = c.getfile()213 data = f.read()214 c.close()215 finally:216 self.stop_server(pid)217 self.failIf(string.find(data, 's_server -quiet -www') == -1)218 def test_HTTPS_secure_context_fail(self):219 pid = self.start_server(self.args)220 try:221 from M2Crypto import httpslib222 ctx = SSL.Context()223 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)224 ctx.load_verify_locations('tests/server.pem')225 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)226 c.putrequest('GET', '/')227 c.putheader('Accept', 'text/html')228 c.putheader('Accept', 'text/plain')229 self.assertRaises(SSL.SSLError, c.endheaders)230 c.close()231 finally:232 self.stop_server(pid)233 234 def test_HTTPSConnection_illegalkeywordarg(self):235 from M2Crypto import httpslib236 self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',237 badKeyword=True)238class MiscSSLClientTestCase(BaseSSLClientTestCase):239 def test_no_connection(self):240 ctx = SSL.Context()241 s = SSL.Connection(ctx)242 243 def test_server_simple(self):244 pid = self.start_server(self.args)245 try:246 self.assertRaises(ValueError, SSL.Context, 'tlsv5')247 ctx = SSL.Context()248 s = SSL.Connection(ctx)249 s.connect(self.srv_addr)250 self.assertRaises(ValueError, s.read, 0)251 data = self.http_get(s)252 s.close()253 finally:254 self.stop_server(pid)255 self.failIf(string.find(data, 's_server -quiet -www') == -1)256 def test_server_simple_secure_context(self):257 pid = self.start_server(self.args)258 try:259 ctx = SSL.Context()260 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)261 ctx.load_verify_locations('tests/ca.pem')262 s = SSL.Connection(ctx)263 s.connect(self.srv_addr)264 data = self.http_get(s)265 s.close()266 finally:267 self.stop_server(pid)268 self.failIf(string.find(data, 's_server -quiet -www') == -1)269 def test_server_simple_secure_context_fail(self):270 pid = self.start_server(self.args)271 try:272 ctx = SSL.Context()273 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)274 ctx.load_verify_locations('tests/server.pem')275 s = SSL.Connection(ctx)276 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)277 s.close()278 finally:279 self.stop_server(pid)280 def test_server_simple_timeouts(self):281 pid = self.start_server(self.args)282 try:283 self.assertRaises(ValueError, SSL.Context, 'tlsv5')284 ctx = SSL.Context()285 s = SSL.Connection(ctx)286 287 r = s.get_socket_read_timeout()288 w = s.get_socket_write_timeout()289 assert r.sec == 0, r.sec290 assert r.microsec == 0, r.microsec291 assert w.sec == 0, w.sec292 assert w.microsec == 0, w.microsec293 s.set_socket_read_timeout(SSL.timeout())294 s.set_socket_write_timeout(SSL.timeout(909,9))295 r = s.get_socket_read_timeout()296 w = s.get_socket_write_timeout()297 assert r.sec == 600, r.sec298 assert r.microsec == 0, r.microsec299 assert w.sec == 909, w.sec300 #assert w.microsec == 9, w.microsec XXX 4000301 302 s.connect(self.srv_addr)303 data = self.http_get(s)304 s.close()305 finally:306 self.stop_server(pid)307 self.failIf(string.find(data, 's_server -quiet -www') == -1)308 def test_tls1_nok(self):309 if fips_mode: # TLS is required in FIPS mode310 return311 self.args.append('-no_tls1')312 pid = self.start_server(self.args)313 try:314 ctx = SSL.Context('tlsv1')315 s = SSL.Connection(ctx)316 try:317 s.connect(self.srv_addr)318 except SSL.SSLError, e:319 self.failUnlessEqual(e[0], 'wrong version number')320 s.close()321 finally:322 self.stop_server(pid)323 def test_tls1_ok(self):324 self.args.append('-tls1')325 pid = self.start_server(self.args)326 try:327 ctx = SSL.Context('tlsv1')328 s = SSL.Connection(ctx)329 s.connect(self.srv_addr)330 data = self.http_get(s)331 s.close()332 finally:333 self.stop_server(pid)334 self.failIf(string.find(data, 's_server -quiet -www') == -1)335 def test_sslv23_no_v2(self):336 if fips_mode: # TLS is required in FIPS mode337 return338 self.args.append('-no_tls1')339 pid = self.start_server(self.args)340 try:341 ctx = SSL.Context('sslv23')342 s = SSL.Connection(ctx)343 s.connect(self.srv_addr)344 self.failUnlessEqual(s.get_version(), 'SSLv3')345 s.close()346 finally:347 self.stop_server(pid)348 def test_sslv23_no_v2_no_service(self):349 if fips_mode: # TLS is required in FIPS mode350 return351 self.args = self.args + ['-no_tls1', '-no_ssl3']352 pid = self.start_server(self.args)353 try:354 ctx = SSL.Context('sslv23')355 s = SSL.Connection(ctx)356 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)357 s.close()358 finally:359 self.stop_server(pid)360 def test_sslv23_weak_crypto(self):361 if fips_mode: # TLS is required in FIPS mode362 return363 self.args = self.args + ['-no_tls1', '-no_ssl3']364 pid = self.start_server(self.args)365 try:366 ctx = SSL.Context('sslv23', weak_crypto=1)367 s = SSL.Connection(ctx)368 if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL369 s.connect(self.srv_addr)370 self.failUnlessEqual(s.get_version(), 'SSLv2')371 else:372 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)373 s.close()374 finally:375 self.stop_server(pid)376 def test_cipher_mismatch(self):377 self.args = self.args + ['-cipher', 'AES256-SHA']378 pid = self.start_server(self.args)379 try:380 ctx = SSL.Context()381 s = SSL.Connection(ctx)382 s.set_cipher_list('AES128-SHA')383 try:384 s.connect(self.srv_addr)385 except SSL.SSLError, e:386 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')387 s.close()388 finally:389 self.stop_server(pid)390 391 def test_no_such_cipher(self):392 self.args = self.args + ['-cipher', 'AES128-SHA']393 pid = self.start_server(self.args)394 try:395 ctx = SSL.Context()396 s = SSL.Connection(ctx)397 s.set_cipher_list('EXP-RC2-MD5')398 try:399 s.connect(self.srv_addr)400 except SSL.SSLError, e:401 self.failUnlessEqual(e[0], 'no ciphers available')402 s.close()403 finally:404 self.stop_server(pid)405 406 def test_no_weak_cipher(self):407 if fips_mode: # Weak ciphers are prohibited408 return409 self.args = self.args + ['-cipher', 'EXP']410 pid = self.start_server(self.args)411 try:412 ctx = SSL.Context()413 s = SSL.Connection(ctx)414 try:415 s.connect(self.srv_addr)416 except SSL.SSLError, e:417 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')418 s.close()419 finally:420 self.stop_server(pid)421 422 def test_use_weak_cipher(self):423 if fips_mode: # Weak ciphers are prohibited424 return425 self.args = self.args + ['-cipher', 'EXP']426 pid = self.start_server(self.args)427 try:428 ctx = SSL.Context(weak_crypto=1)429 s = SSL.Connection(ctx)430 s.connect(self.srv_addr)431 data = self.http_get(s)432 s.close()433 finally:434 self.stop_server(pid)435 self.failIf(string.find(data, 's_server -quiet -www') == -1)436 437 def test_cipher_ok(self):438 self.args = self.args + ['-cipher', 'AES128-SHA']439 pid = self.start_server(self.args)440 try:441 ctx = SSL.Context()442 s = SSL.Connection(ctx)443 s.set_cipher_list('AES128-SHA')444 s.connect(self.srv_addr)445 data = self.http_get(s)446 447 assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()448 449 cipher_stack = s.get_ciphers()450 assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()451 self.assertRaises(IndexError, cipher_stack.__getitem__, 2)452 # For some reason there are 2 entries in the stack453 #assert len(cipher_stack) == 1, len(cipher_stack)454 assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()455 456 # Test Cipher_Stack iterator457 i = 0458 for cipher in cipher_stack:459 i += 1460 assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()461 self.assertEqual('AES128-SHA-128', str(cipher))462 # For some reason there are 2 entries in the stack463 #assert i == 1, i464 self.assertEqual(i, len(cipher_stack))465 466 s.close()467 finally:468 self.stop_server(pid)469 self.failIf(string.find(data, 's_server -quiet -www') == -1)470 471 def verify_cb_new(self, ok, store):472 return verify_cb_new_function(ok, store)473 def test_verify_cb_new(self):474 pid = self.start_server(self.args)475 try:476 ctx = SSL.Context()477 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,478 self.verify_cb_new)479 s = SSL.Connection(ctx)480 try:481 s.connect(self.srv_addr)482 except SSL.SSLError, e:483 assert 0, e484 data = self.http_get(s)485 s.close()486 finally:487 self.stop_server(pid)488 self.failIf(string.find(data, 's_server -quiet -www') == -1)489 def test_verify_cb_new_class(self):490 pid = self.start_server(self.args)491 try:492 ctx = SSL.Context()493 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,494 VerifyCB())495 s = SSL.Connection(ctx)496 try:497 s.connect(self.srv_addr)498 except SSL.SSLError, e:499 assert 0, e500 data = self.http_get(s)501 s.close()502 finally:503 self.stop_server(pid)504 self.failIf(string.find(data, 's_server -quiet -www') == -1)505 def test_verify_cb_new_function(self):506 pid = self.start_server(self.args)507 try:508 ctx = SSL.Context()509 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,510 verify_cb_new_function)511 s = SSL.Connection(ctx)512 try:513 s.connect(self.srv_addr)514 except SSL.SSLError, e:515 assert 0, e516 data = self.http_get(s)517 s.close()518 finally:519 self.stop_server(pid)520 self.failIf(string.find(data, 's_server -quiet -www') == -1)521 def test_verify_cb_lambda(self):522 pid = self.start_server(self.args)523 try:524 ctx = SSL.Context()525 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,526 lambda ok, store: 1)527 s = SSL.Connection(ctx)528 try:529 s.connect(self.srv_addr)530 except SSL.SSLError, e:531 assert 0, e532 data = self.http_get(s)533 s.close()534 finally:535 self.stop_server(pid)536 self.failIf(string.find(data, 's_server -quiet -www') == -1)537 def verify_cb_exception(self, ok, store):538 raise Exception, 'We should fail verification'539 def test_verify_cb_exception(self):540 pid = self.start_server(self.args)541 try:542 ctx = SSL.Context()543 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,544 self.verify_cb_exception)545 s = SSL.Connection(ctx)546 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)547 s.close()548 finally:549 self.stop_server(pid)550 def test_verify_cb_not_callable(self):551 ctx = SSL.Context()552 self.assertRaises(TypeError,553 ctx.set_verify,554 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,555 9,556 1)557 def test_verify_cb_wrong_callable(self):558 pid = self.start_server(self.args)559 try:560 ctx = SSL.Context()561 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,562 lambda _: '')563 s = SSL.Connection(ctx)564 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)565 s.close()566 finally:567 self.stop_server(pid)568 def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):569 try:570 from M2Crypto import X509571 assert not ok572 assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \573 err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \574 err == m2.X509_V_ERR_CERT_UNTRUSTED or \575 err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE576 assert m2.ssl_ctx_get_cert_store(ctx_ptr)577 assert X509.X509(x509_ptr).as_pem()578 except AssertionError:579 # If we let exceptions propagate from here the580 # caller may see strange errors. This is cleaner.581 return 0582 return 1583 def test_verify_cb_old(self):584 pid = self.start_server(self.args)585 try:586 ctx = SSL.Context()587 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,588 self.verify_cb_old)589 s = SSL.Connection(ctx)590 try:591 s.connect(self.srv_addr)592 except SSL.SSLError, e:593 assert 0, e594 data = self.http_get(s)595 s.close()596 finally:597 self.stop_server(pid)598 self.failIf(string.find(data, 's_server -quiet -www') == -1)599 def test_verify_allow_unknown_old(self):600 pid = self.start_server(self.args)601 try:602 ctx = SSL.Context()603 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,604 SSL.cb.ssl_verify_callback)605 ctx.set_allow_unknown_ca(1)606 s = SSL.Connection(ctx)607 try:608 s.connect(self.srv_addr)609 except SSL.SSLError, e:610 assert 0, e611 data = self.http_get(s)612 s.close()613 finally:614 self.stop_server(pid)615 self.failIf(string.find(data, 's_server -quiet -www') == -1)616 def test_verify_allow_unknown_new(self):617 pid = self.start_server(self.args)618 try:619 ctx = SSL.Context()620 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,621 SSL.cb.ssl_verify_callback_allow_unknown_ca)622 s = SSL.Connection(ctx)623 try:624 s.connect(self.srv_addr)625 except SSL.SSLError, e:626 assert 0, e627 data = self.http_get(s)628 s.close()629 finally:630 self.stop_server(pid)631 self.failIf(string.find(data, 's_server -quiet -www') == -1)632 def test_verify_cert(self):633 pid = self.start_server(self.args)634 try:635 ctx = SSL.Context()636 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)637 ctx.load_verify_locations('tests/ca.pem')638 s = SSL.Connection(ctx)639 try:640 s.connect(self.srv_addr)641 except SSL.SSLError, e:642 assert 0, e643 data = self.http_get(s)644 s.close()645 finally:646 self.stop_server(pid)647 self.failIf(string.find(data, 's_server -quiet -www') == -1)648 def test_verify_cert_fail(self):649 pid = self.start_server(self.args)650 try:651 ctx = SSL.Context()652 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)653 ctx.load_verify_locations('tests/server.pem')654 s = SSL.Connection(ctx)655 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)656 s.close()657 finally:658 self.stop_server(pid)659 def test_verify_cert_mutual_auth(self):660 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 661 pid = self.start_server(self.args)662 try:663 ctx = SSL.Context()664 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)665 ctx.load_verify_locations('tests/ca.pem')666 ctx.load_cert('tests/x509.pem')667 s = SSL.Connection(ctx)668 try:669 s.connect(self.srv_addr)670 except SSL.SSLError, e:671 assert 0, e672 data = self.http_get(s)673 s.close()674 finally:675 self.stop_server(pid)676 self.failIf(string.find(data, 's_server -quiet -www') == -1)677 def test_verify_cert_mutual_auth_servernbio(self):678 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])679 pid = self.start_server(self.args)680 try:681 ctx = SSL.Context()682 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)683 ctx.load_verify_locations('tests/ca.pem')684 ctx.load_cert('tests/x509.pem')685 s = SSL.Connection(ctx)686 try:687 s.connect(self.srv_addr)688 except SSL.SSLError, e:689 assert 0, e690 data = self.http_get(s)691 s.close()692 finally:693 self.stop_server(pid)694 self.failIf(string.find(data, 's_server -quiet -www') == -1)695 def test_verify_cert_mutual_auth_fail(self):696 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 697 pid = self.start_server(self.args)698 try:699 ctx = SSL.Context()700 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)701 ctx.load_verify_locations('tests/ca.pem')702 s = SSL.Connection(ctx)703 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)704 s.close()705 finally:706 self.stop_server(pid)707 def test_verify_nocert_fail(self):708 self.args.extend(['-nocert']) 709 pid = self.start_server(self.args)710 try:711 ctx = SSL.Context()712 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)713 ctx.load_verify_locations('tests/ca.pem')714 s = SSL.Connection(ctx)715 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)716 s.close()717 finally:718 self.stop_server(pid)719 def test_blocking0(self):720 pid = self.start_server(self.args)721 try:722 ctx = SSL.Context()723 s = SSL.Connection(ctx)724 s.setblocking(0)725 self.assertRaises(Exception, s.connect, self.srv_addr)726 s.close()727 finally:728 self.stop_server(pid)729 def test_blocking1(self):730 pid = self.start_server(self.args)731 try:732 ctx = SSL.Context()733 s = SSL.Connection(ctx)734 s.setblocking(1)735 try:736 s.connect(self.srv_addr)737 except SSL.SSLError, e:738 assert 0, e739 data = self.http_get(s)740 s.close()741 finally:742 self.stop_server(pid)743 self.failIf(string.find(data, 's_server -quiet -www') == -1)744 def test_makefile(self):745 pid = self.start_server(self.args)746 try:747 ctx = SSL.Context()748 s = SSL.Connection(ctx)749 try:750 s.connect(self.srv_addr)751 except SSL.SSLError, e:752 assert 0, e753 bio = s.makefile('rw')754 #s.close() # XXX bug 6628?755 bio.write('GET / HTTP/1.0\n\n')756 bio.flush()757 data = bio.read()758 bio.close()759 s.close()760 finally:761 self.stop_server(pid)762 self.failIf(string.find(data, 's_server -quiet -www') == -1)763 def test_makefile_err(self):764 pid = self.start_server(self.args)765 try:766 ctx = SSL.Context()767 s = SSL.Connection(ctx)768 try:769 s.connect(self.srv_addr)770 except SSL.SSLError, e:771 assert 0, e772 f = s.makefile()773 data = self.http_get(s)774 s.close()775 del f776 del s777 err_code = Err.peek_error_code()778 assert not err_code, 'Unexpected error: %s' % err_code779 err = Err.get_error()780 assert not err, 'Unexpected error: %s' % err781 finally:782 self.stop_server(pid)783 self.failIf(string.find(data, 's_server -quiet -www') == -1)784 def test_info_callback(self):785 pid = self.start_server(self.args)786 try:787 ctx = SSL.Context()788 ctx.set_info_callback()789 s = SSL.Connection(ctx)790 s.connect(self.srv_addr)791 data = self.http_get(s)792 s.close()793 finally:794 self.stop_server(pid)795 self.failIf(string.find(data, 's_server -quiet -www') == -1)796class UrllibSSLClientTestCase(BaseSSLClientTestCase):797 def test_urllib(self):798 pid = self.start_server(self.args)799 try:800 from M2Crypto import m2urllib801 url = m2urllib.FancyURLopener()802 url.addheader('Connection', 'close')803 u = url.open('https://%s:%s/' % (srv_host, srv_port))804 data = u.read()805 u.close()806 finally:807 self.stop_server(pid)808 self.failIf(string.find(data, 's_server -quiet -www') == -1)809 # XXX Don't actually know how to use m2urllib safely!810 #def test_urllib_secure_context(self):811 #def test_urllib_secure_context_fail(self):812 # XXX Don't actually know how to use m2urllib safely!813 #def test_urllib_safe_context(self):814 #def test_urllib_safe_context_fail(self):815class Urllib2SSLClientTestCase(BaseSSLClientTestCase):816 if sys.version_info >= (2,4):817 def test_urllib2(self):818 pid = self.start_server(self.args)819 try:820 from M2Crypto import m2urllib2821 opener = m2urllib2.build_opener()822 opener.addheaders = [('Connection', 'close')]823 u = opener.open('https://%s:%s/' % (srv_host, srv_port))824 data = u.read()825 u.close()826 finally:827 self.stop_server(pid)828 self.failIf(string.find(data, 's_server -quiet -www') == -1)829 830 def test_urllib2_secure_context(self):831 pid = self.start_server(self.args)832 try:833 ctx = SSL.Context()834 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)835 ctx.load_verify_locations('tests/ca.pem')836 837 from M2Crypto import m2urllib2838 opener = m2urllib2.build_opener(ctx)839 opener.addheaders = [('Connection', 'close')] 840 u = opener.open('https://%s:%s/' % (srv_host, srv_port))841 data = u.read()842 u.close()843 finally:844 self.stop_server(pid)845 self.failIf(string.find(data, 's_server -quiet -www') == -1)846 847 def test_urllib2_secure_context_fail(self):848 pid = self.start_server(self.args)849 try:850 ctx = SSL.Context()851 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)852 ctx.load_verify_locations('tests/server.pem')853 854 from M2Crypto import m2urllib2855 opener = m2urllib2.build_opener(ctx)856 opener.addheaders = [('Connection', 'close')]857 self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))858 finally:859 self.stop_server(pid)860 def test_z_urllib2_opener(self):861 pid = self.start_server(self.args)862 try:863 ctx = SSL.Context()864 from M2Crypto import m2urllib2865 opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())866 m2urllib2.install_opener(opener)867 req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))868 u = m2urllib2.urlopen(req)869 data = u.read()870 u.close()871 finally:872 self.stop_server(pid)873 self.failIf(string.find(data, 's_server -quiet -www') == -1)874 def test_urllib2_opener_handlers(self):875 ctx = SSL.Context()876 from M2Crypto import m2urllib2877 opener = m2urllib2.build_opener(ctx,878 m2urllib2.HTTPBasicAuthHandler())879 def test_urllib2_leak(self):880 pid = self.start_server(self.args)881 try:882 import gc883 from M2Crypto import m2urllib2884 o = m2urllib2.build_opener()885 r = o.open('https://%s:%s/' % (srv_host, srv_port))886 s = [r.fp._sock.fp]887 r.close()888 self.assertEqual(len(gc.get_referrers(s[0])), 1)889 finally:890 self.stop_server(pid)891class TwistedSSLClientTestCase(BaseSSLClientTestCase):892 def test_twisted_wrapper(self):893 # Test only when twisted and ZopeInterfaces are present894 try:895 from twisted.internet.protocol import ClientFactory896 from twisted.protocols.basic import LineReceiver897 from twisted.internet import reactor898 import M2Crypto.SSL.TwistedProtocolWrapper as wrapper899 except ImportError:900 import warnings901 warnings.warn('Skipping twisted wrapper test because twisted not found')902 return903 904 class EchoClient(LineReceiver):905 def connectionMade(self):906 self.sendLine('GET / HTTP/1.0\n\n')907 def lineReceived(self, line):908 global twisted_data909 twisted_data += line910 class EchoClientFactory(ClientFactory):911 protocol = EchoClient912 913 def clientConnectionFailed(self, connector, reason):914 reactor.stop()915 assert 0, reason916 917 def clientConnectionLost(self, connector, reason):918 reactor.stop()919 920 pid = self.start_server(self.args)921 class ContextFactory:922 def getContext(self):923 return SSL.Context()924 try:925 global twisted_data926 twisted_data = ''927 928 contextFactory = ContextFactory()929 factory = EchoClientFactory()930 wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)931 reactor.run() # This will block until reactor.stop() is called932 finally:933 self.stop_server(pid)934 self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)...
test_ssl.py.svn-base
Source:test_ssl.py.svn-base
...58 pass59 return False60class BaseSSLClientTestCase(unittest.TestCase):61 openssl_in_path = find_openssl()62 def start_server(self, args):63 if not self.openssl_in_path:64 raise Exception('openssl command not in PATH')65 66 pid = os.fork()67 if pid == 0:68 # openssl must be started in the tests directory for it69 # to find the .pem files70 os.chdir('tests')71 try:72 os.execvp('openssl', args)73 finally:74 os.chdir('..')75 76 else:77 time.sleep(sleepTime)78 return pid79 def stop_server(self, pid):80 os.kill(pid, 1)81 os.waitpid(pid, 0)82 def http_get(self, s):83 s.send('GET / HTTP/1.0\n\n') 84 resp = ''85 while 1:86 try:87 r = s.recv(4096)88 if not r:89 break90 except SSL.SSLError: # s_server throws an 'unexpected eof'...91 break92 resp = resp + r 93 return resp94 def setUp(self):95 self.srv_host = srv_host96 self.srv_port = srv_port97 self.srv_addr = (srv_host, srv_port)98 self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)99 self.args = ['s_server', '-quiet', '-www',100 #'-cert', 'server.pem', Implicitly using this101 '-accept', str(self.srv_port)]102 def tearDown(self):103 global srv_port104 srv_port = srv_port - 1105class PassSSLClientTestCase(BaseSSLClientTestCase):106 107 def test_pass(self):108 pass109class HttpslibSSLClientTestCase(BaseSSLClientTestCase):110 def test_HTTPSConnection(self):111 pid = self.start_server(self.args)112 try:113 from M2Crypto import httpslib114 c = httpslib.HTTPSConnection(srv_host, srv_port)115 c.request('GET', '/')116 data = c.getresponse().read()117 c.close()118 finally:119 self.stop_server(pid)120 self.failIf(string.find(data, 's_server -quiet -www') == -1)121 def test_HTTPSConnection_resume_session(self):122 pid = self.start_server(self.args)123 try:124 from M2Crypto import httpslib125 ctx = SSL.Context()126 ctx.load_verify_locations(cafile='tests/ca.pem')127 ctx.load_cert('tests/x509.pem')128 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)129 ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)130 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)131 c.request('GET', '/')132 ses = c.get_session()133 t = ses.as_text()134 data = c.getresponse().read()135 # Appearently closing connection here screws session; Ali Polatel?136 # c.close()137 138 ctx2 = SSL.Context()139 ctx2.load_verify_locations(cafile='tests/ca.pem')140 ctx2.load_cert('tests/x509.pem')141 ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)142 ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)143 c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)144 c2.set_session(ses)145 c2.request('GET', '/')146 ses2 = c2.get_session()147 t2 = ses2.as_text()148 data = c2.getresponse().read()149 c.close()150 c2.close()151 assert t == t2, "Sessions did not match"152 finally:153 self.stop_server(pid)154 self.failIf(string.find(data, 's_server -quiet -www') == -1)155 def test_HTTPSConnection_secure_context(self):156 pid = self.start_server(self.args)157 try:158 from M2Crypto import httpslib159 ctx = SSL.Context()160 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)161 ctx.load_verify_locations('tests/ca.pem')162 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)163 c.request('GET', '/')164 data = c.getresponse().read()165 c.close()166 finally:167 self.stop_server(pid)168 self.failIf(string.find(data, 's_server -quiet -www') == -1)169 def test_HTTPSConnection_secure_context_fail(self):170 pid = self.start_server(self.args)171 try:172 from M2Crypto import httpslib173 ctx = SSL.Context()174 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)175 ctx.load_verify_locations('tests/server.pem')176 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)177 self.assertRaises(SSL.SSLError, c.request, 'GET', '/')178 c.close()179 finally:180 self.stop_server(pid)181 def test_HTTPS(self):182 pid = self.start_server(self.args)183 try:184 from M2Crypto import httpslib185 c = httpslib.HTTPS(srv_host, srv_port)186 c.putrequest('GET', '/')187 c.putheader('Accept', 'text/html')188 c.putheader('Accept', 'text/plain')189 c.endheaders()190 err, msg, headers = c.getreply()191 assert err == 200, err192 f = c.getfile()193 data = f.read()194 c.close()195 finally:196 self.stop_server(pid)197 self.failIf(string.find(data, 's_server -quiet -www') == -1)198 def test_HTTPS_secure_context(self):199 pid = self.start_server(self.args)200 try:201 from M2Crypto import httpslib202 ctx = SSL.Context()203 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)204 ctx.load_verify_locations('tests/ca.pem')205 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)206 c.putrequest('GET', '/')207 c.putheader('Accept', 'text/html')208 c.putheader('Accept', 'text/plain')209 c.endheaders()210 err, msg, headers = c.getreply()211 assert err == 200, err212 f = c.getfile()213 data = f.read()214 c.close()215 finally:216 self.stop_server(pid)217 self.failIf(string.find(data, 's_server -quiet -www') == -1)218 def test_HTTPS_secure_context_fail(self):219 pid = self.start_server(self.args)220 try:221 from M2Crypto import httpslib222 ctx = SSL.Context()223 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)224 ctx.load_verify_locations('tests/server.pem')225 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)226 c.putrequest('GET', '/')227 c.putheader('Accept', 'text/html')228 c.putheader('Accept', 'text/plain')229 self.assertRaises(SSL.SSLError, c.endheaders)230 c.close()231 finally:232 self.stop_server(pid)233 234 def test_HTTPSConnection_illegalkeywordarg(self):235 from M2Crypto import httpslib236 self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',237 badKeyword=True)238class MiscSSLClientTestCase(BaseSSLClientTestCase):239 def test_no_connection(self):240 ctx = SSL.Context()241 s = SSL.Connection(ctx)242 243 def test_server_simple(self):244 pid = self.start_server(self.args)245 try:246 self.assertRaises(ValueError, SSL.Context, 'tlsv5')247 ctx = SSL.Context()248 s = SSL.Connection(ctx)249 s.connect(self.srv_addr)250 self.assertRaises(ValueError, s.read, 0)251 data = self.http_get(s)252 s.close()253 finally:254 self.stop_server(pid)255 self.failIf(string.find(data, 's_server -quiet -www') == -1)256 def test_server_simple_secure_context(self):257 pid = self.start_server(self.args)258 try:259 ctx = SSL.Context()260 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)261 ctx.load_verify_locations('tests/ca.pem')262 s = SSL.Connection(ctx)263 s.connect(self.srv_addr)264 data = self.http_get(s)265 s.close()266 finally:267 self.stop_server(pid)268 self.failIf(string.find(data, 's_server -quiet -www') == -1)269 def test_server_simple_secure_context_fail(self):270 pid = self.start_server(self.args)271 try:272 ctx = SSL.Context()273 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)274 ctx.load_verify_locations('tests/server.pem')275 s = SSL.Connection(ctx)276 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)277 s.close()278 finally:279 self.stop_server(pid)280 def test_server_simple_timeouts(self):281 pid = self.start_server(self.args)282 try:283 self.assertRaises(ValueError, SSL.Context, 'tlsv5')284 ctx = SSL.Context()285 s = SSL.Connection(ctx)286 287 r = s.get_socket_read_timeout()288 w = s.get_socket_write_timeout()289 assert r.sec == 0, r.sec290 assert r.microsec == 0, r.microsec291 assert w.sec == 0, w.sec292 assert w.microsec == 0, w.microsec293 s.set_socket_read_timeout(SSL.timeout())294 s.set_socket_write_timeout(SSL.timeout(909,9))295 r = s.get_socket_read_timeout()296 w = s.get_socket_write_timeout()297 assert r.sec == 600, r.sec298 assert r.microsec == 0, r.microsec299 assert w.sec == 909, w.sec300 #assert w.microsec == 9, w.microsec XXX 4000301 302 s.connect(self.srv_addr)303 data = self.http_get(s)304 s.close()305 finally:306 self.stop_server(pid)307 self.failIf(string.find(data, 's_server -quiet -www') == -1)308 def test_tls1_nok(self):309 if fips_mode: # TLS is required in FIPS mode310 return311 self.args.append('-no_tls1')312 pid = self.start_server(self.args)313 try:314 ctx = SSL.Context('tlsv1')315 s = SSL.Connection(ctx)316 try:317 s.connect(self.srv_addr)318 except SSL.SSLError, e:319 self.failUnlessEqual(e[0], 'wrong version number')320 s.close()321 finally:322 self.stop_server(pid)323 def test_tls1_ok(self):324 self.args.append('-tls1')325 pid = self.start_server(self.args)326 try:327 ctx = SSL.Context('tlsv1')328 s = SSL.Connection(ctx)329 s.connect(self.srv_addr)330 data = self.http_get(s)331 s.close()332 finally:333 self.stop_server(pid)334 self.failIf(string.find(data, 's_server -quiet -www') == -1)335 def test_sslv23_no_v2(self):336 if fips_mode: # TLS is required in FIPS mode337 return338 self.args.append('-no_tls1')339 pid = self.start_server(self.args)340 try:341 ctx = SSL.Context('sslv23')342 s = SSL.Connection(ctx)343 s.connect(self.srv_addr)344 self.failUnlessEqual(s.get_version(), 'SSLv3')345 s.close()346 finally:347 self.stop_server(pid)348 def test_sslv23_no_v2_no_service(self):349 if fips_mode: # TLS is required in FIPS mode350 return351 self.args = self.args + ['-no_tls1', '-no_ssl3']352 pid = self.start_server(self.args)353 try:354 ctx = SSL.Context('sslv23')355 s = SSL.Connection(ctx)356 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)357 s.close()358 finally:359 self.stop_server(pid)360 def test_sslv23_weak_crypto(self):361 if fips_mode: # TLS is required in FIPS mode362 return363 self.args = self.args + ['-no_tls1', '-no_ssl3']364 pid = self.start_server(self.args)365 try:366 ctx = SSL.Context('sslv23', weak_crypto=1)367 s = SSL.Connection(ctx)368 if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL369 s.connect(self.srv_addr)370 self.failUnlessEqual(s.get_version(), 'SSLv2')371 else:372 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)373 s.close()374 finally:375 self.stop_server(pid)376 def test_cipher_mismatch(self):377 self.args = self.args + ['-cipher', 'AES256-SHA']378 pid = self.start_server(self.args)379 try:380 ctx = SSL.Context()381 s = SSL.Connection(ctx)382 s.set_cipher_list('AES128-SHA')383 try:384 s.connect(self.srv_addr)385 except SSL.SSLError, e:386 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')387 s.close()388 finally:389 self.stop_server(pid)390 391 def test_no_such_cipher(self):392 self.args = self.args + ['-cipher', 'AES128-SHA']393 pid = self.start_server(self.args)394 try:395 ctx = SSL.Context()396 s = SSL.Connection(ctx)397 s.set_cipher_list('EXP-RC2-MD5')398 try:399 s.connect(self.srv_addr)400 except SSL.SSLError, e:401 self.failUnlessEqual(e[0], 'no ciphers available')402 s.close()403 finally:404 self.stop_server(pid)405 406 def test_no_weak_cipher(self):407 if fips_mode: # Weak ciphers are prohibited408 return409 self.args = self.args + ['-cipher', 'EXP']410 pid = self.start_server(self.args)411 try:412 ctx = SSL.Context()413 s = SSL.Connection(ctx)414 try:415 s.connect(self.srv_addr)416 except SSL.SSLError, e:417 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')418 s.close()419 finally:420 self.stop_server(pid)421 422 def test_use_weak_cipher(self):423 if fips_mode: # Weak ciphers are prohibited424 return425 self.args = self.args + ['-cipher', 'EXP']426 pid = self.start_server(self.args)427 try:428 ctx = SSL.Context(weak_crypto=1)429 s = SSL.Connection(ctx)430 s.connect(self.srv_addr)431 data = self.http_get(s)432 s.close()433 finally:434 self.stop_server(pid)435 self.failIf(string.find(data, 's_server -quiet -www') == -1)436 437 def test_cipher_ok(self):438 self.args = self.args + ['-cipher', 'AES128-SHA']439 pid = self.start_server(self.args)440 try:441 ctx = SSL.Context()442 s = SSL.Connection(ctx)443 s.set_cipher_list('AES128-SHA')444 s.connect(self.srv_addr)445 data = self.http_get(s)446 447 assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()448 449 cipher_stack = s.get_ciphers()450 assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()451 self.assertRaises(IndexError, cipher_stack.__getitem__, 2)452 # For some reason there are 2 entries in the stack453 #assert len(cipher_stack) == 1, len(cipher_stack)454 assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()455 456 # Test Cipher_Stack iterator457 i = 0458 for cipher in cipher_stack:459 i += 1460 assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()461 self.assertEqual('AES128-SHA-128', str(cipher))462 # For some reason there are 2 entries in the stack463 #assert i == 1, i464 self.assertEqual(i, len(cipher_stack))465 466 s.close()467 finally:468 self.stop_server(pid)469 self.failIf(string.find(data, 's_server -quiet -www') == -1)470 471 def verify_cb_new(self, ok, store):472 return verify_cb_new_function(ok, store)473 def test_verify_cb_new(self):474 pid = self.start_server(self.args)475 try:476 ctx = SSL.Context()477 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,478 self.verify_cb_new)479 s = SSL.Connection(ctx)480 try:481 s.connect(self.srv_addr)482 except SSL.SSLError, e:483 assert 0, e484 data = self.http_get(s)485 s.close()486 finally:487 self.stop_server(pid)488 self.failIf(string.find(data, 's_server -quiet -www') == -1)489 def test_verify_cb_new_class(self):490 pid = self.start_server(self.args)491 try:492 ctx = SSL.Context()493 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,494 VerifyCB())495 s = SSL.Connection(ctx)496 try:497 s.connect(self.srv_addr)498 except SSL.SSLError, e:499 assert 0, e500 data = self.http_get(s)501 s.close()502 finally:503 self.stop_server(pid)504 self.failIf(string.find(data, 's_server -quiet -www') == -1)505 def test_verify_cb_new_function(self):506 pid = self.start_server(self.args)507 try:508 ctx = SSL.Context()509 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,510 verify_cb_new_function)511 s = SSL.Connection(ctx)512 try:513 s.connect(self.srv_addr)514 except SSL.SSLError, e:515 assert 0, e516 data = self.http_get(s)517 s.close()518 finally:519 self.stop_server(pid)520 self.failIf(string.find(data, 's_server -quiet -www') == -1)521 def test_verify_cb_lambda(self):522 pid = self.start_server(self.args)523 try:524 ctx = SSL.Context()525 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,526 lambda ok, store: 1)527 s = SSL.Connection(ctx)528 try:529 s.connect(self.srv_addr)530 except SSL.SSLError, e:531 assert 0, e532 data = self.http_get(s)533 s.close()534 finally:535 self.stop_server(pid)536 self.failIf(string.find(data, 's_server -quiet -www') == -1)537 def verify_cb_exception(self, ok, store):538 raise Exception, 'We should fail verification'539 def test_verify_cb_exception(self):540 pid = self.start_server(self.args)541 try:542 ctx = SSL.Context()543 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,544 self.verify_cb_exception)545 s = SSL.Connection(ctx)546 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)547 s.close()548 finally:549 self.stop_server(pid)550 def test_verify_cb_not_callable(self):551 ctx = SSL.Context()552 self.assertRaises(TypeError,553 ctx.set_verify,554 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,555 9,556 1)557 def test_verify_cb_wrong_callable(self):558 pid = self.start_server(self.args)559 try:560 ctx = SSL.Context()561 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,562 lambda _: '')563 s = SSL.Connection(ctx)564 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)565 s.close()566 finally:567 self.stop_server(pid)568 def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):569 try:570 from M2Crypto import X509571 assert not ok572 assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \573 err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \574 err == m2.X509_V_ERR_CERT_UNTRUSTED or \575 err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE576 assert m2.ssl_ctx_get_cert_store(ctx_ptr)577 assert X509.X509(x509_ptr).as_pem()578 except AssertionError:579 # If we let exceptions propagate from here the580 # caller may see strange errors. This is cleaner.581 return 0582 return 1583 def test_verify_cb_old(self):584 pid = self.start_server(self.args)585 try:586 ctx = SSL.Context()587 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,588 self.verify_cb_old)589 s = SSL.Connection(ctx)590 try:591 s.connect(self.srv_addr)592 except SSL.SSLError, e:593 assert 0, e594 data = self.http_get(s)595 s.close()596 finally:597 self.stop_server(pid)598 self.failIf(string.find(data, 's_server -quiet -www') == -1)599 def test_verify_allow_unknown_old(self):600 pid = self.start_server(self.args)601 try:602 ctx = SSL.Context()603 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,604 SSL.cb.ssl_verify_callback)605 ctx.set_allow_unknown_ca(1)606 s = SSL.Connection(ctx)607 try:608 s.connect(self.srv_addr)609 except SSL.SSLError, e:610 assert 0, e611 data = self.http_get(s)612 s.close()613 finally:614 self.stop_server(pid)615 self.failIf(string.find(data, 's_server -quiet -www') == -1)616 def test_verify_allow_unknown_new(self):617 pid = self.start_server(self.args)618 try:619 ctx = SSL.Context()620 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,621 SSL.cb.ssl_verify_callback_allow_unknown_ca)622 s = SSL.Connection(ctx)623 try:624 s.connect(self.srv_addr)625 except SSL.SSLError, e:626 assert 0, e627 data = self.http_get(s)628 s.close()629 finally:630 self.stop_server(pid)631 self.failIf(string.find(data, 's_server -quiet -www') == -1)632 def test_verify_cert(self):633 pid = self.start_server(self.args)634 try:635 ctx = SSL.Context()636 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)637 ctx.load_verify_locations('tests/ca.pem')638 s = SSL.Connection(ctx)639 try:640 s.connect(self.srv_addr)641 except SSL.SSLError, e:642 assert 0, e643 data = self.http_get(s)644 s.close()645 finally:646 self.stop_server(pid)647 self.failIf(string.find(data, 's_server -quiet -www') == -1)648 def test_verify_cert_fail(self):649 pid = self.start_server(self.args)650 try:651 ctx = SSL.Context()652 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)653 ctx.load_verify_locations('tests/server.pem')654 s = SSL.Connection(ctx)655 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)656 s.close()657 finally:658 self.stop_server(pid)659 def test_verify_cert_mutual_auth(self):660 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 661 pid = self.start_server(self.args)662 try:663 ctx = SSL.Context()664 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)665 ctx.load_verify_locations('tests/ca.pem')666 ctx.load_cert('tests/x509.pem')667 s = SSL.Connection(ctx)668 try:669 s.connect(self.srv_addr)670 except SSL.SSLError, e:671 assert 0, e672 data = self.http_get(s)673 s.close()674 finally:675 self.stop_server(pid)676 self.failIf(string.find(data, 's_server -quiet -www') == -1)677 def test_verify_cert_mutual_auth_servernbio(self):678 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])679 pid = self.start_server(self.args)680 try:681 ctx = SSL.Context()682 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)683 ctx.load_verify_locations('tests/ca.pem')684 ctx.load_cert('tests/x509.pem')685 s = SSL.Connection(ctx)686 try:687 s.connect(self.srv_addr)688 except SSL.SSLError, e:689 assert 0, e690 data = self.http_get(s)691 s.close()692 finally:693 self.stop_server(pid)694 self.failIf(string.find(data, 's_server -quiet -www') == -1)695 def test_verify_cert_mutual_auth_fail(self):696 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 697 pid = self.start_server(self.args)698 try:699 ctx = SSL.Context()700 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)701 ctx.load_verify_locations('tests/ca.pem')702 s = SSL.Connection(ctx)703 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)704 s.close()705 finally:706 self.stop_server(pid)707 def test_verify_nocert_fail(self):708 self.args.extend(['-nocert']) 709 pid = self.start_server(self.args)710 try:711 ctx = SSL.Context()712 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)713 ctx.load_verify_locations('tests/ca.pem')714 s = SSL.Connection(ctx)715 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)716 s.close()717 finally:718 self.stop_server(pid)719 def test_blocking0(self):720 pid = self.start_server(self.args)721 try:722 ctx = SSL.Context()723 s = SSL.Connection(ctx)724 s.setblocking(0)725 self.assertRaises(Exception, s.connect, self.srv_addr)726 s.close()727 finally:728 self.stop_server(pid)729 def test_blocking1(self):730 pid = self.start_server(self.args)731 try:732 ctx = SSL.Context()733 s = SSL.Connection(ctx)734 s.setblocking(1)735 try:736 s.connect(self.srv_addr)737 except SSL.SSLError, e:738 assert 0, e739 data = self.http_get(s)740 s.close()741 finally:742 self.stop_server(pid)743 self.failIf(string.find(data, 's_server -quiet -www') == -1)744 def test_makefile(self):745 pid = self.start_server(self.args)746 try:747 ctx = SSL.Context()748 s = SSL.Connection(ctx)749 try:750 s.connect(self.srv_addr)751 except SSL.SSLError, e:752 assert 0, e753 bio = s.makefile('rw')754 #s.close() # XXX bug 6628?755 bio.write('GET / HTTP/1.0\n\n')756 bio.flush()757 data = bio.read()758 bio.close()759 s.close()760 finally:761 self.stop_server(pid)762 self.failIf(string.find(data, 's_server -quiet -www') == -1)763 def test_makefile_err(self):764 pid = self.start_server(self.args)765 try:766 ctx = SSL.Context()767 s = SSL.Connection(ctx)768 try:769 s.connect(self.srv_addr)770 except SSL.SSLError, e:771 assert 0, e772 f = s.makefile()773 data = self.http_get(s)774 s.close()775 del f776 del s777 err_code = Err.peek_error_code()778 assert not err_code, 'Unexpected error: %s' % err_code779 err = Err.get_error()780 assert not err, 'Unexpected error: %s' % err781 finally:782 self.stop_server(pid)783 self.failIf(string.find(data, 's_server -quiet -www') == -1)784 def test_info_callback(self):785 pid = self.start_server(self.args)786 try:787 ctx = SSL.Context()788 ctx.set_info_callback()789 s = SSL.Connection(ctx)790 s.connect(self.srv_addr)791 data = self.http_get(s)792 s.close()793 finally:794 self.stop_server(pid)795 self.failIf(string.find(data, 's_server -quiet -www') == -1)796class UrllibSSLClientTestCase(BaseSSLClientTestCase):797 def test_urllib(self):798 pid = self.start_server(self.args)799 try:800 from M2Crypto import m2urllib801 url = m2urllib.FancyURLopener()802 url.addheader('Connection', 'close')803 u = url.open('https://%s:%s/' % (srv_host, srv_port))804 data = u.read()805 u.close()806 finally:807 self.stop_server(pid)808 self.failIf(string.find(data, 's_server -quiet -www') == -1)809 # XXX Don't actually know how to use m2urllib safely!810 #def test_urllib_secure_context(self):811 #def test_urllib_secure_context_fail(self):812 # XXX Don't actually know how to use m2urllib safely!813 #def test_urllib_safe_context(self):814 #def test_urllib_safe_context_fail(self):815class Urllib2SSLClientTestCase(BaseSSLClientTestCase):816 if sys.version_info >= (2,4):817 def test_urllib2(self):818 pid = self.start_server(self.args)819 try:820 from M2Crypto import m2urllib2821 opener = m2urllib2.build_opener()822 opener.addheaders = [('Connection', 'close')]823 u = opener.open('https://%s:%s/' % (srv_host, srv_port))824 data = u.read()825 u.close()826 finally:827 self.stop_server(pid)828 self.failIf(string.find(data, 's_server -quiet -www') == -1)829 830 def test_urllib2_secure_context(self):831 pid = self.start_server(self.args)832 try:833 ctx = SSL.Context()834 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)835 ctx.load_verify_locations('tests/ca.pem')836 837 from M2Crypto import m2urllib2838 opener = m2urllib2.build_opener(ctx)839 opener.addheaders = [('Connection', 'close')] 840 u = opener.open('https://%s:%s/' % (srv_host, srv_port))841 data = u.read()842 u.close()843 finally:844 self.stop_server(pid)845 self.failIf(string.find(data, 's_server -quiet -www') == -1)846 847 def test_urllib2_secure_context_fail(self):848 pid = self.start_server(self.args)849 try:850 ctx = SSL.Context()851 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)852 ctx.load_verify_locations('tests/server.pem')853 854 from M2Crypto import m2urllib2855 opener = m2urllib2.build_opener(ctx)856 opener.addheaders = [('Connection', 'close')]857 self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))858 finally:859 self.stop_server(pid)860 def test_z_urllib2_opener(self):861 pid = self.start_server(self.args)862 try:863 ctx = SSL.Context()864 from M2Crypto import m2urllib2865 opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())866 m2urllib2.install_opener(opener)867 req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))868 u = m2urllib2.urlopen(req)869 data = u.read()870 u.close()871 finally:872 self.stop_server(pid)873 self.failIf(string.find(data, 's_server -quiet -www') == -1)874 def test_urllib2_opener_handlers(self):875 ctx = SSL.Context()876 from M2Crypto import m2urllib2877 opener = m2urllib2.build_opener(ctx,878 m2urllib2.HTTPBasicAuthHandler())879 def test_urllib2_leak(self):880 pid = self.start_server(self.args)881 try:882 import gc883 from M2Crypto import m2urllib2884 o = m2urllib2.build_opener()885 r = o.open('https://%s:%s/' % (srv_host, srv_port))886 s = [r.fp._sock.fp]887 r.close()888 self.assertEqual(len(gc.get_referrers(s[0])), 1)889 finally:890 self.stop_server(pid)891class TwistedSSLClientTestCase(BaseSSLClientTestCase):892 def test_twisted_wrapper(self):893 # Test only when twisted and ZopeInterfaces are present894 try:895 from twisted.internet.protocol import ClientFactory896 from twisted.protocols.basic import LineReceiver897 from twisted.internet import reactor898 import M2Crypto.SSL.TwistedProtocolWrapper as wrapper899 except ImportError:900 import warnings901 warnings.warn('Skipping twisted wrapper test because twisted not found')902 return903 904 class EchoClient(LineReceiver):905 def connectionMade(self):906 self.sendLine('GET / HTTP/1.0\n\n')907 def lineReceived(self, line):908 global twisted_data909 twisted_data += line910 class EchoClientFactory(ClientFactory):911 protocol = EchoClient912 913 def clientConnectionFailed(self, connector, reason):914 reactor.stop()915 assert 0, reason916 917 def clientConnectionLost(self, connector, reason):918 reactor.stop()919 920 pid = self.start_server(self.args)921 class ContextFactory:922 def getContext(self):923 return SSL.Context()924 try:925 global twisted_data926 twisted_data = ''927 928 contextFactory = ContextFactory()929 factory = EchoClientFactory()930 wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)931 reactor.run() # This will block until reactor.stop() is called932 finally:933 self.stop_server(pid)934 self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)...
test_debug_server.py
Source:test_debug_server.py
...20 return killed21def setup_server(start_server, find_free_port, no_eval=False, stdout=False, env=None):22 port = find_free_port()23 args = ["+DEBUG_LOG", "+NO_EVAL"] if no_eval else ["+DEBUG_LOG"]24 s = start_server(port, "test_debug_server", args, stdout=stdout, env=env)25 assert s.poll() is None26 uri = "ws://localhost:{0}".format(port)27 return s, uri28def test_continue_stop(start_server, find_free_port):29 s, uri = setup_server(start_server, find_free_port, True)30 async def test_logic():31 client = hgdb.HGDBClient(uri, None)32 await client.connect()33 await client.continue_()34 await client.stop()35 asyncio.get_event_loop().run_until_complete(test_logic())36 # check if process exit37 killed = is_killed(s)38 if not killed:...
test_client_server.py
Source:test_client_server.py
...36 # Process callbacks scheduled with call_soon by appending a callback37 # to stop the event loop then running it until it hits that callback.38 self.loop.call_soon(self.loop.stop)39 self.loop.run_forever()40 def start_server(self, **kwds):41 server = serve(handler, 'localhost', 8642, **kwds)42 self.server = self.loop.run_until_complete(server)43 def start_client(self, path='', **kwds):44 client = connect('ws://localhost:8642/' + path, **kwds)45 self.client = self.loop.run_until_complete(client)46 def stop_client(self):47 self.loop.run_until_complete(self.client.worker)48 def stop_server(self):49 self.server.close()50 self.loop.run_until_complete(self.server.wait_closed())51 def test_basic(self):52 self.start_server()53 self.start_client()54 self.loop.run_until_complete(self.client.send("Hello!"))55 reply = self.loop.run_until_complete(self.client.recv())56 self.assertEqual(reply, "Hello!")57 self.stop_client()58 self.stop_server()59 def test_server_close_while_client_connected(self):60 self.start_server()61 self.start_client()62 self.stop_server()63 def test_explicit_event_loop(self):64 self.start_server(loop=self.loop)65 self.start_client(loop=self.loop)66 self.loop.run_until_complete(self.client.send("Hello!"))67 reply = self.loop.run_until_complete(self.client.recv())68 self.assertEqual(reply, "Hello!")69 self.stop_client()70 self.stop_server()71 def test_protocol_attributes(self):72 self.start_server()73 self.start_client('attributes')74 expected_attrs = ('localhost', 8642, self.secure)75 client_attrs = (self.client.host, self.client.port, self.client.secure)76 self.assertEqual(client_attrs, expected_attrs)77 server_attrs = self.loop.run_until_complete(self.client.recv())78 self.assertEqual(server_attrs, repr(expected_attrs))79 self.stop_client()80 self.stop_server()81 def test_protocol_headers(self):82 self.start_server()83 self.start_client('headers')84 client_req = self.client.request_headers85 client_resp = self.client.response_headers86 self.assertEqual(client_req['User-Agent'], USER_AGENT)87 self.assertEqual(client_resp['Server'], USER_AGENT)88 server_req = self.loop.run_until_complete(self.client.recv())89 server_resp = self.loop.run_until_complete(self.client.recv())90 self.assertEqual(server_req, str(client_req))91 self.assertEqual(server_resp, str(client_resp))92 self.stop_client()93 self.stop_server()94 def test_protocol_raw_headers(self):95 self.start_server()96 self.start_client('raw_headers')97 client_req = self.client.raw_request_headers98 client_resp = self.client.raw_response_headers99 self.assertEqual(dict(client_req)['User-Agent'], USER_AGENT)100 self.assertEqual(dict(client_resp)['Server'], USER_AGENT)101 server_req = self.loop.run_until_complete(self.client.recv())102 server_resp = self.loop.run_until_complete(self.client.recv())103 self.assertEqual(server_req, repr(client_req))104 self.assertEqual(server_resp, repr(client_resp))105 self.stop_client()106 self.stop_server()107 def test_protocol_custom_request_headers_dict(self):108 self.start_server()109 self.start_client('raw_headers', extra_headers={'X-Spam': 'Eggs'})110 req_headers = self.loop.run_until_complete(self.client.recv())111 self.loop.run_until_complete(self.client.recv())112 self.assertIn("('X-Spam', 'Eggs')", req_headers)113 self.stop_client()114 self.stop_server()115 def test_protocol_custom_request_headers_list(self):116 self.start_server()117 self.start_client('raw_headers', extra_headers=[('X-Spam', 'Eggs')])118 req_headers = self.loop.run_until_complete(self.client.recv())119 self.loop.run_until_complete(self.client.recv())120 self.assertIn("('X-Spam', 'Eggs')", req_headers)121 self.stop_client()122 self.stop_server()123 def test_protocol_custom_response_headers_callable_dict(self):124 self.start_server(extra_headers=lambda p, r: {'X-Spam': 'Eggs'})125 self.start_client('raw_headers')126 self.loop.run_until_complete(self.client.recv())127 resp_headers = self.loop.run_until_complete(self.client.recv())128 self.assertIn("('X-Spam', 'Eggs')", resp_headers)129 self.stop_client()130 self.stop_server()131 def test_protocol_custom_response_headers_callable_list(self):132 self.start_server(extra_headers=lambda p, r: [('X-Spam', 'Eggs')])133 self.start_client('raw_headers')134 self.loop.run_until_complete(self.client.recv())135 resp_headers = self.loop.run_until_complete(self.client.recv())136 self.assertIn("('X-Spam', 'Eggs')", resp_headers)137 self.stop_client()138 self.stop_server()139 def test_protocol_custom_response_headers_dict(self):140 self.start_server(extra_headers={'X-Spam': 'Eggs'})141 self.start_client('raw_headers')142 self.loop.run_until_complete(self.client.recv())143 resp_headers = self.loop.run_until_complete(self.client.recv())144 self.assertIn("('X-Spam', 'Eggs')", resp_headers)145 self.stop_client()146 self.stop_server()147 def test_protocol_custom_response_headers_list(self):148 self.start_server(extra_headers=[('X-Spam', 'Eggs')])149 self.start_client('raw_headers')150 self.loop.run_until_complete(self.client.recv())151 resp_headers = self.loop.run_until_complete(self.client.recv())152 self.assertIn("('X-Spam', 'Eggs')", resp_headers)153 self.stop_client()154 self.stop_server()155 def test_no_subprotocol(self):156 self.start_server()157 self.start_client('subprotocol')158 server_subprotocol = self.loop.run_until_complete(self.client.recv())159 self.assertEqual(server_subprotocol, repr(None))160 self.assertEqual(self.client.subprotocol, None)161 self.stop_client()162 self.stop_server()163 def test_subprotocol_found(self):164 self.start_server(subprotocols=['superchat', 'chat'])165 self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])166 server_subprotocol = self.loop.run_until_complete(self.client.recv())167 self.assertEqual(server_subprotocol, repr('chat'))168 self.assertEqual(self.client.subprotocol, 'chat')169 self.stop_client()170 self.stop_server()171 def test_subprotocol_not_found(self):172 self.start_server(subprotocols=['superchat'])173 self.start_client('subprotocol', subprotocols=['otherchat'])174 server_subprotocol = self.loop.run_until_complete(self.client.recv())175 self.assertEqual(server_subprotocol, repr(None))176 self.assertEqual(self.client.subprotocol, None)177 self.stop_client()178 self.stop_server()179 def test_subprotocol_not_offered(self):180 self.start_server()181 self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])182 server_subprotocol = self.loop.run_until_complete(self.client.recv())183 self.assertEqual(server_subprotocol, repr(None))184 self.assertEqual(self.client.subprotocol, None)185 self.stop_client()186 self.stop_server()187 def test_subprotocol_not_requested(self):188 self.start_server(subprotocols=['superchat', 'chat'])189 self.start_client('subprotocol')190 server_subprotocol = self.loop.run_until_complete(self.client.recv())191 self.assertEqual(server_subprotocol, repr(None))192 self.assertEqual(self.client.subprotocol, None)193 self.stop_client()194 self.stop_server()195 @unittest.mock.patch.object(196 WebSocketServerProtocol, 'select_subprotocol', autospec=True)197 def test_subprotocol_error(self, _select_subprotocol):198 _select_subprotocol.return_value = 'superchat'199 self.start_server(subprotocols=['superchat'])200 with self.assertRaises(InvalidHandshake):201 self.start_client('subprotocol', subprotocols=['otherchat'])202 self.run_loop_once()203 self.stop_server()204 @unittest.mock.patch('websockets.server.read_request')205 def test_server_receives_malformed_request(self, _read_request):206 _read_request.side_effect = ValueError("read_request failed")207 self.start_server()208 with self.assertRaises(InvalidHandshake):209 self.start_client()210 self.stop_server()211 @unittest.mock.patch('websockets.client.read_response')212 def test_client_receives_malformed_response(self, _read_response):213 _read_response.side_effect = ValueError("read_response failed")214 self.start_server()215 with self.assertRaises(InvalidHandshake):216 self.start_client()217 self.run_loop_once()218 self.stop_server()219 @unittest.mock.patch('websockets.client.build_request')220 def test_client_sends_invalid_handshake_request(self, _build_request):221 def wrong_build_request(set_header):222 return '42'223 _build_request.side_effect = wrong_build_request224 self.start_server()225 with self.assertRaises(InvalidHandshake):226 self.start_client()227 self.stop_server()228 @unittest.mock.patch('websockets.server.build_response')229 def test_server_sends_invalid_handshake_response(self, _build_response):230 def wrong_build_response(set_header, key):231 return build_response(set_header, '42')232 _build_response.side_effect = wrong_build_response233 self.start_server()234 with self.assertRaises(InvalidHandshake):235 self.start_client()236 self.stop_server()237 @unittest.mock.patch('websockets.client.read_response')238 def test_server_does_not_switch_protocols(self, _read_response):239 @asyncio.coroutine240 def wrong_read_response(stream):241 code, headers = yield from read_response(stream)242 return 400, headers243 _read_response.side_effect = wrong_read_response244 self.start_server()245 with self.assertRaises(InvalidHandshake):246 self.start_client()247 self.run_loop_once()248 self.stop_server()249 @unittest.mock.patch('websockets.server.WebSocketServerProtocol.send')250 def test_server_handler_crashes(self, send):251 send.side_effect = ValueError("send failed")252 self.start_server()253 self.start_client()254 self.loop.run_until_complete(self.client.send("Hello!"))255 with self.assertRaises(ConnectionClosed):256 self.loop.run_until_complete(self.client.recv())257 self.stop_client()258 self.stop_server()259 # Connection ends with an unexpected error.260 self.assertEqual(self.client.close_code, 1011)261 @unittest.mock.patch('websockets.server.WebSocketServerProtocol.close')262 def test_server_close_crashes(self, close):263 close.side_effect = ValueError("close failed")264 self.start_server()265 self.start_client()266 self.loop.run_until_complete(self.client.send("Hello!"))267 reply = self.loop.run_until_complete(self.client.recv())268 self.assertEqual(reply, "Hello!")269 self.stop_client()270 self.stop_server()271 # Connection ends with an abnormal closure.272 self.assertEqual(self.client.close_code, 1006)273@unittest.skipUnless(os.path.exists(testcert), "test certificate is missing")274class SSLClientServerTests(ClientServerTests):275 secure = True276 @property277 def server_context(self):278 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)279 ssl_context.load_cert_chain(testcert)280 return ssl_context281 @property282 def client_context(self):283 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)284 ssl_context.load_verify_locations(testcert)285 ssl_context.verify_mode = ssl.CERT_REQUIRED286 return ssl_context287 def start_server(self, *args, **kwds):288 kwds['ssl'] = self.server_context289 server = serve(handler, 'localhost', 8642, **kwds)290 self.server = self.loop.run_until_complete(server)291 def start_client(self, path='', **kwds):292 kwds['ssl'] = self.client_context293 client = connect('wss://localhost:8642/' + path, **kwds)294 self.client = self.loop.run_until_complete(client)295 def test_ws_uri_is_rejected(self):296 self.start_server()297 client = connect('ws://localhost:8642/', ssl=self.client_context)298 with self.assertRaises(ValueError):299 self.loop.run_until_complete(client)300 self.stop_server()301class ClientServerOriginTests(unittest.TestCase):302 def setUp(self):303 self.loop = asyncio.new_event_loop()304 asyncio.set_event_loop(self.loop)305 def tearDown(self):306 self.loop.close()307 def test_checking_origin_succeeds(self):308 server = self.loop.run_until_complete(309 serve(handler, 'localhost', 8642, origins=['http://localhost']))310 client = self.loop.run_until_complete(...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!