Best Python code snippet using splinter
test_ssl.py
Source:test_ssl.py
...58 pass59 return False60class BaseSSLClientTestCase(unittest.TestCase):61 openssl_in_path = find_openssl()62 def start_server(self, args):63 if not self.openssl_in_path:64 raise Exception('openssl command not in PATH')65 66 pid = os.fork()67 if pid == 0:68 # openssl must be started in the tests directory for it69 # to find the .pem files70 os.chdir('tests')71 try:72 os.execvp('openssl', args)73 finally:74 os.chdir('..')75 76 else:77 time.sleep(sleepTime)78 return pid79 def stop_server(self, pid):80 os.kill(pid, 1)81 os.waitpid(pid, 0)82 def http_get(self, s):83 s.send('GET / HTTP/1.0\n\n') 84 resp = ''85 while 1:86 try:87 r = s.recv(4096)88 if not r:89 break90 except SSL.SSLError: # s_server throws an 'unexpected eof'...91 break92 resp = resp + r 93 return resp94 def setUp(self):95 self.srv_host = srv_host96 self.srv_port = srv_port97 self.srv_addr = (srv_host, srv_port)98 self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)99 self.args = ['s_server', '-quiet', '-www',100 #'-cert', 'server.pem', Implicitly using this101 '-accept', str(self.srv_port)]102 def tearDown(self):103 global srv_port104 srv_port = srv_port - 1105class PassSSLClientTestCase(BaseSSLClientTestCase):106 107 def test_pass(self):108 pass109class HttpslibSSLClientTestCase(BaseSSLClientTestCase):110 def test_HTTPSConnection(self):111 pid = self.start_server(self.args)112 try:113 from M2Crypto import httpslib114 c = httpslib.HTTPSConnection(srv_host, srv_port)115 c.request('GET', '/')116 data = c.getresponse().read()117 c.close()118 finally:119 self.stop_server(pid)120 self.failIf(string.find(data, 's_server -quiet -www') == -1)121 def test_HTTPSConnection_resume_session(self):122 pid = self.start_server(self.args)123 try:124 from M2Crypto import httpslib125 ctx = SSL.Context()126 ctx.load_verify_locations(cafile='tests/ca.pem')127 ctx.load_cert('tests/x509.pem')128 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)129 ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)130 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)131 c.request('GET', '/')132 ses = c.get_session()133 t = ses.as_text()134 data = c.getresponse().read()135 # Appearently closing connection here screws session; Ali Polatel?136 # c.close()137 138 ctx2 = SSL.Context()139 ctx2.load_verify_locations(cafile='tests/ca.pem')140 ctx2.load_cert('tests/x509.pem')141 ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)142 ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)143 c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)144 c2.set_session(ses)145 c2.request('GET', '/')146 ses2 = c2.get_session()147 t2 = ses2.as_text()148 data = c2.getresponse().read()149 c.close()150 c2.close()151 assert t == t2, "Sessions did not match"152 finally:153 self.stop_server(pid)154 self.failIf(string.find(data, 's_server -quiet -www') == -1)155 def test_HTTPSConnection_secure_context(self):156 pid = self.start_server(self.args)157 try:158 from M2Crypto import httpslib159 ctx = SSL.Context()160 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)161 ctx.load_verify_locations('tests/ca.pem')162 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)163 c.request('GET', '/')164 data = c.getresponse().read()165 c.close()166 finally:167 self.stop_server(pid)168 self.failIf(string.find(data, 's_server -quiet -www') == -1)169 def test_HTTPSConnection_secure_context_fail(self):170 pid = self.start_server(self.args)171 try:172 from M2Crypto import httpslib173 ctx = SSL.Context()174 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)175 ctx.load_verify_locations('tests/server.pem')176 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)177 self.assertRaises(SSL.SSLError, c.request, 'GET', '/')178 c.close()179 finally:180 self.stop_server(pid)181 def test_HTTPS(self):182 pid = self.start_server(self.args)183 try:184 from M2Crypto import httpslib185 c = httpslib.HTTPS(srv_host, srv_port)186 c.putrequest('GET', '/')187 c.putheader('Accept', 'text/html')188 c.putheader('Accept', 'text/plain')189 c.endheaders()190 err, msg, headers = c.getreply()191 assert err == 200, err192 f = c.getfile()193 data = f.read()194 c.close()195 finally:196 self.stop_server(pid)197 self.failIf(string.find(data, 's_server -quiet -www') == -1)198 def test_HTTPS_secure_context(self):199 pid = self.start_server(self.args)200 try:201 from M2Crypto import httpslib202 ctx = SSL.Context()203 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)204 ctx.load_verify_locations('tests/ca.pem')205 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)206 c.putrequest('GET', '/')207 c.putheader('Accept', 'text/html')208 c.putheader('Accept', 'text/plain')209 c.endheaders()210 err, msg, headers = c.getreply()211 assert err == 200, err212 f = c.getfile()213 data = f.read()214 c.close()215 finally:216 self.stop_server(pid)217 self.failIf(string.find(data, 's_server -quiet -www') == -1)218 def test_HTTPS_secure_context_fail(self):219 pid = self.start_server(self.args)220 try:221 from M2Crypto import httpslib222 ctx = SSL.Context()223 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)224 ctx.load_verify_locations('tests/server.pem')225 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)226 c.putrequest('GET', '/')227 c.putheader('Accept', 'text/html')228 c.putheader('Accept', 'text/plain')229 self.assertRaises(SSL.SSLError, c.endheaders)230 c.close()231 finally:232 self.stop_server(pid)233 234 def test_HTTPSConnection_illegalkeywordarg(self):235 from M2Crypto import httpslib236 self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',237 badKeyword=True)238class MiscSSLClientTestCase(BaseSSLClientTestCase):239 def test_no_connection(self):240 ctx = SSL.Context()241 s = SSL.Connection(ctx)242 243 def test_server_simple(self):244 pid = self.start_server(self.args)245 try:246 self.assertRaises(ValueError, SSL.Context, 'tlsv5')247 ctx = SSL.Context()248 s = SSL.Connection(ctx)249 s.connect(self.srv_addr)250 self.assertRaises(ValueError, s.read, 0)251 data = self.http_get(s)252 s.close()253 finally:254 self.stop_server(pid)255 self.failIf(string.find(data, 's_server -quiet -www') == -1)256 def test_server_simple_secure_context(self):257 pid = self.start_server(self.args)258 try:259 ctx = SSL.Context()260 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)261 ctx.load_verify_locations('tests/ca.pem')262 s = SSL.Connection(ctx)263 s.connect(self.srv_addr)264 data = self.http_get(s)265 s.close()266 finally:267 self.stop_server(pid)268 self.failIf(string.find(data, 's_server -quiet -www') == -1)269 def test_server_simple_secure_context_fail(self):270 pid = self.start_server(self.args)271 try:272 ctx = SSL.Context()273 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)274 ctx.load_verify_locations('tests/server.pem')275 s = SSL.Connection(ctx)276 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)277 s.close()278 finally:279 self.stop_server(pid)280 def test_server_simple_timeouts(self):281 pid = self.start_server(self.args)282 try:283 self.assertRaises(ValueError, SSL.Context, 'tlsv5')284 ctx = SSL.Context()285 s = SSL.Connection(ctx)286 287 r = s.get_socket_read_timeout()288 w = s.get_socket_write_timeout()289 assert r.sec == 0, r.sec290 assert r.microsec == 0, r.microsec291 assert w.sec == 0, w.sec292 assert w.microsec == 0, w.microsec293 s.set_socket_read_timeout(SSL.timeout())294 s.set_socket_write_timeout(SSL.timeout(909,9))295 r = s.get_socket_read_timeout()296 w = s.get_socket_write_timeout()297 assert r.sec == 600, r.sec298 assert r.microsec == 0, r.microsec299 assert w.sec == 909, w.sec300 #assert w.microsec == 9, w.microsec XXX 4000301 302 s.connect(self.srv_addr)303 data = self.http_get(s)304 s.close()305 finally:306 self.stop_server(pid)307 self.failIf(string.find(data, 's_server -quiet -www') == -1)308 def test_tls1_nok(self):309 if fips_mode: # TLS is required in FIPS mode310 return311 self.args.append('-no_tls1')312 pid = self.start_server(self.args)313 try:314 ctx = SSL.Context('tlsv1')315 s = SSL.Connection(ctx)316 try:317 s.connect(self.srv_addr)318 except SSL.SSLError, e:319 self.failUnlessEqual(e[0], 'wrong version number')320 s.close()321 finally:322 self.stop_server(pid)323 def test_tls1_ok(self):324 self.args.append('-tls1')325 pid = self.start_server(self.args)326 try:327 ctx = SSL.Context('tlsv1')328 s = SSL.Connection(ctx)329 s.connect(self.srv_addr)330 data = self.http_get(s)331 s.close()332 finally:333 self.stop_server(pid)334 self.failIf(string.find(data, 's_server -quiet -www') == -1)335 def test_sslv23_no_v2(self):336 if fips_mode: # TLS is required in FIPS mode337 return338 self.args.append('-no_tls1')339 pid = self.start_server(self.args)340 try:341 ctx = SSL.Context('sslv23')342 s = SSL.Connection(ctx)343 s.connect(self.srv_addr)344 self.failUnlessEqual(s.get_version(), 'SSLv3')345 s.close()346 finally:347 self.stop_server(pid)348 def test_sslv23_no_v2_no_service(self):349 if fips_mode: # TLS is required in FIPS mode350 return351 self.args = self.args + ['-no_tls1', '-no_ssl3']352 pid = self.start_server(self.args)353 try:354 ctx = SSL.Context('sslv23')355 s = SSL.Connection(ctx)356 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)357 s.close()358 finally:359 self.stop_server(pid)360 def test_sslv23_weak_crypto(self):361 if fips_mode: # TLS is required in FIPS mode362 return363 self.args = self.args + ['-no_tls1', '-no_ssl3']364 pid = self.start_server(self.args)365 try:366 ctx = SSL.Context('sslv23', weak_crypto=1)367 s = SSL.Connection(ctx)368 if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL369 s.connect(self.srv_addr)370 self.failUnlessEqual(s.get_version(), 'SSLv2')371 else:372 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)373 s.close()374 finally:375 self.stop_server(pid)376 def test_cipher_mismatch(self):377 self.args = self.args + ['-cipher', 'AES256-SHA']378 pid = self.start_server(self.args)379 try:380 ctx = SSL.Context()381 s = SSL.Connection(ctx)382 s.set_cipher_list('AES128-SHA')383 try:384 s.connect(self.srv_addr)385 except SSL.SSLError, e:386 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')387 s.close()388 finally:389 self.stop_server(pid)390 391 def test_no_such_cipher(self):392 self.args = self.args + ['-cipher', 'AES128-SHA']393 pid = self.start_server(self.args)394 try:395 ctx = SSL.Context()396 s = SSL.Connection(ctx)397 s.set_cipher_list('EXP-RC2-MD5')398 try:399 s.connect(self.srv_addr)400 except SSL.SSLError, e:401 self.failUnlessEqual(e[0], 'no ciphers available')402 s.close()403 finally:404 self.stop_server(pid)405 406 def test_no_weak_cipher(self):407 if fips_mode: # Weak ciphers are prohibited408 return409 self.args = self.args + ['-cipher', 'EXP']410 pid = self.start_server(self.args)411 try:412 ctx = SSL.Context()413 s = SSL.Connection(ctx)414 try:415 s.connect(self.srv_addr)416 except SSL.SSLError, e:417 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')418 s.close()419 finally:420 self.stop_server(pid)421 422 def test_use_weak_cipher(self):423 if fips_mode: # Weak ciphers are prohibited424 return425 self.args = self.args + ['-cipher', 'EXP']426 pid = self.start_server(self.args)427 try:428 ctx = SSL.Context(weak_crypto=1)429 s = SSL.Connection(ctx)430 s.connect(self.srv_addr)431 data = self.http_get(s)432 s.close()433 finally:434 self.stop_server(pid)435 self.failIf(string.find(data, 's_server -quiet -www') == -1)436 437 def test_cipher_ok(self):438 self.args = self.args + ['-cipher', 'AES128-SHA']439 pid = self.start_server(self.args)440 try:441 ctx = SSL.Context()442 s = SSL.Connection(ctx)443 s.set_cipher_list('AES128-SHA')444 s.connect(self.srv_addr)445 data = self.http_get(s)446 447 assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()448 449 cipher_stack = s.get_ciphers()450 assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()451 self.assertRaises(IndexError, cipher_stack.__getitem__, 2)452 # For some reason there are 2 entries in the stack453 #assert len(cipher_stack) == 1, len(cipher_stack)454 assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()455 456 # Test Cipher_Stack iterator457 i = 0458 for cipher in cipher_stack:459 i += 1460 assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()461 self.assertEqual('AES128-SHA-128', str(cipher))462 # For some reason there are 2 entries in the stack463 #assert i == 1, i464 self.assertEqual(i, len(cipher_stack))465 466 s.close()467 finally:468 self.stop_server(pid)469 self.failIf(string.find(data, 's_server -quiet -www') == -1)470 471 def verify_cb_new(self, ok, store):472 return verify_cb_new_function(ok, store)473 def test_verify_cb_new(self):474 pid = self.start_server(self.args)475 try:476 ctx = SSL.Context()477 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,478 self.verify_cb_new)479 s = SSL.Connection(ctx)480 try:481 s.connect(self.srv_addr)482 except SSL.SSLError, e:483 assert 0, e484 data = self.http_get(s)485 s.close()486 finally:487 self.stop_server(pid)488 self.failIf(string.find(data, 's_server -quiet -www') == -1)489 def test_verify_cb_new_class(self):490 pid = self.start_server(self.args)491 try:492 ctx = SSL.Context()493 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,494 VerifyCB())495 s = SSL.Connection(ctx)496 try:497 s.connect(self.srv_addr)498 except SSL.SSLError, e:499 assert 0, e500 data = self.http_get(s)501 s.close()502 finally:503 self.stop_server(pid)504 self.failIf(string.find(data, 's_server -quiet -www') == -1)505 def test_verify_cb_new_function(self):506 pid = self.start_server(self.args)507 try:508 ctx = SSL.Context()509 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,510 verify_cb_new_function)511 s = SSL.Connection(ctx)512 try:513 s.connect(self.srv_addr)514 except SSL.SSLError, e:515 assert 0, e516 data = self.http_get(s)517 s.close()518 finally:519 self.stop_server(pid)520 self.failIf(string.find(data, 's_server -quiet -www') == -1)521 def test_verify_cb_lambda(self):522 pid = self.start_server(self.args)523 try:524 ctx = SSL.Context()525 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,526 lambda ok, store: 1)527 s = SSL.Connection(ctx)528 try:529 s.connect(self.srv_addr)530 except SSL.SSLError, e:531 assert 0, e532 data = self.http_get(s)533 s.close()534 finally:535 self.stop_server(pid)536 self.failIf(string.find(data, 's_server -quiet -www') == -1)537 def verify_cb_exception(self, ok, store):538 raise Exception, 'We should fail verification'539 def test_verify_cb_exception(self):540 pid = self.start_server(self.args)541 try:542 ctx = SSL.Context()543 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,544 self.verify_cb_exception)545 s = SSL.Connection(ctx)546 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)547 s.close()548 finally:549 self.stop_server(pid)550 def test_verify_cb_not_callable(self):551 ctx = SSL.Context()552 self.assertRaises(TypeError,553 ctx.set_verify,554 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,555 9,556 1)557 def test_verify_cb_wrong_callable(self):558 pid = self.start_server(self.args)559 try:560 ctx = SSL.Context()561 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,562 lambda _: '')563 s = SSL.Connection(ctx)564 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)565 s.close()566 finally:567 self.stop_server(pid)568 def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):569 try:570 from M2Crypto import X509571 assert not ok572 assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \573 err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \574 err == m2.X509_V_ERR_CERT_UNTRUSTED or \575 err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE576 assert m2.ssl_ctx_get_cert_store(ctx_ptr)577 assert X509.X509(x509_ptr).as_pem()578 except AssertionError:579 # If we let exceptions propagate from here the580 # caller may see strange errors. This is cleaner.581 return 0582 return 1583 def test_verify_cb_old(self):584 pid = self.start_server(self.args)585 try:586 ctx = SSL.Context()587 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,588 self.verify_cb_old)589 s = SSL.Connection(ctx)590 try:591 s.connect(self.srv_addr)592 except SSL.SSLError, e:593 assert 0, e594 data = self.http_get(s)595 s.close()596 finally:597 self.stop_server(pid)598 self.failIf(string.find(data, 's_server -quiet -www') == -1)599 def test_verify_allow_unknown_old(self):600 pid = self.start_server(self.args)601 try:602 ctx = SSL.Context()603 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,604 SSL.cb.ssl_verify_callback)605 ctx.set_allow_unknown_ca(1)606 s = SSL.Connection(ctx)607 try:608 s.connect(self.srv_addr)609 except SSL.SSLError, e:610 assert 0, e611 data = self.http_get(s)612 s.close()613 finally:614 self.stop_server(pid)615 self.failIf(string.find(data, 's_server -quiet -www') == -1)616 def test_verify_allow_unknown_new(self):617 pid = self.start_server(self.args)618 try:619 ctx = SSL.Context()620 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,621 SSL.cb.ssl_verify_callback_allow_unknown_ca)622 s = SSL.Connection(ctx)623 try:624 s.connect(self.srv_addr)625 except SSL.SSLError, e:626 assert 0, e627 data = self.http_get(s)628 s.close()629 finally:630 self.stop_server(pid)631 self.failIf(string.find(data, 's_server -quiet -www') == -1)632 def test_verify_cert(self):633 pid = self.start_server(self.args)634 try:635 ctx = SSL.Context()636 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)637 ctx.load_verify_locations('tests/ca.pem')638 s = SSL.Connection(ctx)639 try:640 s.connect(self.srv_addr)641 except SSL.SSLError, e:642 assert 0, e643 data = self.http_get(s)644 s.close()645 finally:646 self.stop_server(pid)647 self.failIf(string.find(data, 's_server -quiet -www') == -1)648 def test_verify_cert_fail(self):649 pid = self.start_server(self.args)650 try:651 ctx = SSL.Context()652 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)653 ctx.load_verify_locations('tests/server.pem')654 s = SSL.Connection(ctx)655 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)656 s.close()657 finally:658 self.stop_server(pid)659 def test_verify_cert_mutual_auth(self):660 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 661 pid = self.start_server(self.args)662 try:663 ctx = SSL.Context()664 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)665 ctx.load_verify_locations('tests/ca.pem')666 ctx.load_cert('tests/x509.pem')667 s = SSL.Connection(ctx)668 try:669 s.connect(self.srv_addr)670 except SSL.SSLError, e:671 assert 0, e672 data = self.http_get(s)673 s.close()674 finally:675 self.stop_server(pid)676 self.failIf(string.find(data, 's_server -quiet -www') == -1)677 def test_verify_cert_mutual_auth_servernbio(self):678 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])679 pid = self.start_server(self.args)680 try:681 ctx = SSL.Context()682 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)683 ctx.load_verify_locations('tests/ca.pem')684 ctx.load_cert('tests/x509.pem')685 s = SSL.Connection(ctx)686 try:687 s.connect(self.srv_addr)688 except SSL.SSLError, e:689 assert 0, e690 data = self.http_get(s)691 s.close()692 finally:693 self.stop_server(pid)694 self.failIf(string.find(data, 's_server -quiet -www') == -1)695 def test_verify_cert_mutual_auth_fail(self):696 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 697 pid = self.start_server(self.args)698 try:699 ctx = SSL.Context()700 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)701 ctx.load_verify_locations('tests/ca.pem')702 s = SSL.Connection(ctx)703 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)704 s.close()705 finally:706 self.stop_server(pid)707 def test_verify_nocert_fail(self):708 self.args.extend(['-nocert']) 709 pid = self.start_server(self.args)710 try:711 ctx = SSL.Context()712 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)713 ctx.load_verify_locations('tests/ca.pem')714 s = SSL.Connection(ctx)715 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)716 s.close()717 finally:718 self.stop_server(pid)719 def test_blocking0(self):720 pid = self.start_server(self.args)721 try:722 ctx = SSL.Context()723 s = SSL.Connection(ctx)724 s.setblocking(0)725 self.assertRaises(Exception, s.connect, self.srv_addr)726 s.close()727 finally:728 self.stop_server(pid)729 def test_blocking1(self):730 pid = self.start_server(self.args)731 try:732 ctx = SSL.Context()733 s = SSL.Connection(ctx)734 s.setblocking(1)735 try:736 s.connect(self.srv_addr)737 except SSL.SSLError, e:738 assert 0, e739 data = self.http_get(s)740 s.close()741 finally:742 self.stop_server(pid)743 self.failIf(string.find(data, 's_server -quiet -www') == -1)744 def test_makefile(self):745 pid = self.start_server(self.args)746 try:747 ctx = SSL.Context()748 s = SSL.Connection(ctx)749 try:750 s.connect(self.srv_addr)751 except SSL.SSLError, e:752 assert 0, e753 bio = s.makefile('rw')754 #s.close() # XXX bug 6628?755 bio.write('GET / HTTP/1.0\n\n')756 bio.flush()757 data = bio.read()758 bio.close()759 s.close()760 finally:761 self.stop_server(pid)762 self.failIf(string.find(data, 's_server -quiet -www') == -1)763 def test_makefile_err(self):764 pid = self.start_server(self.args)765 try:766 ctx = SSL.Context()767 s = SSL.Connection(ctx)768 try:769 s.connect(self.srv_addr)770 except SSL.SSLError, e:771 assert 0, e772 f = s.makefile()773 data = self.http_get(s)774 s.close()775 del f776 del s777 err_code = Err.peek_error_code()778 assert not err_code, 'Unexpected error: %s' % err_code779 err = Err.get_error()780 assert not err, 'Unexpected error: %s' % err781 finally:782 self.stop_server(pid)783 self.failIf(string.find(data, 's_server -quiet -www') == -1)784 def test_info_callback(self):785 pid = self.start_server(self.args)786 try:787 ctx = SSL.Context()788 ctx.set_info_callback()789 s = SSL.Connection(ctx)790 s.connect(self.srv_addr)791 data = self.http_get(s)792 s.close()793 finally:794 self.stop_server(pid)795 self.failIf(string.find(data, 's_server -quiet -www') == -1)796class UrllibSSLClientTestCase(BaseSSLClientTestCase):797 def test_urllib(self):798 pid = self.start_server(self.args)799 try:800 from M2Crypto import m2urllib801 url = m2urllib.FancyURLopener()802 url.addheader('Connection', 'close')803 u = url.open('https://%s:%s/' % (srv_host, srv_port))804 data = u.read()805 u.close()806 finally:807 self.stop_server(pid)808 self.failIf(string.find(data, 's_server -quiet -www') == -1)809 # XXX Don't actually know how to use m2urllib safely!810 #def test_urllib_secure_context(self):811 #def test_urllib_secure_context_fail(self):812 # XXX Don't actually know how to use m2urllib safely!813 #def test_urllib_safe_context(self):814 #def test_urllib_safe_context_fail(self):815class Urllib2SSLClientTestCase(BaseSSLClientTestCase):816 if sys.version_info >= (2,4):817 def test_urllib2(self):818 pid = self.start_server(self.args)819 try:820 from M2Crypto import m2urllib2821 opener = m2urllib2.build_opener()822 opener.addheaders = [('Connection', 'close')]823 u = opener.open('https://%s:%s/' % (srv_host, srv_port))824 data = u.read()825 u.close()826 finally:827 self.stop_server(pid)828 self.failIf(string.find(data, 's_server -quiet -www') == -1)829 830 def test_urllib2_secure_context(self):831 pid = self.start_server(self.args)832 try:833 ctx = SSL.Context()834 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)835 ctx.load_verify_locations('tests/ca.pem')836 837 from M2Crypto import m2urllib2838 opener = m2urllib2.build_opener(ctx)839 opener.addheaders = [('Connection', 'close')] 840 u = opener.open('https://%s:%s/' % (srv_host, srv_port))841 data = u.read()842 u.close()843 finally:844 self.stop_server(pid)845 self.failIf(string.find(data, 's_server -quiet -www') == -1)846 847 def test_urllib2_secure_context_fail(self):848 pid = self.start_server(self.args)849 try:850 ctx = SSL.Context()851 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)852 ctx.load_verify_locations('tests/server.pem')853 854 from M2Crypto import m2urllib2855 opener = m2urllib2.build_opener(ctx)856 opener.addheaders = [('Connection', 'close')]857 self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))858 finally:859 self.stop_server(pid)860 def test_z_urllib2_opener(self):861 pid = self.start_server(self.args)862 try:863 ctx = SSL.Context()864 from M2Crypto import m2urllib2865 opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())866 m2urllib2.install_opener(opener)867 req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))868 u = m2urllib2.urlopen(req)869 data = u.read()870 u.close()871 finally:872 self.stop_server(pid)873 self.failIf(string.find(data, 's_server -quiet -www') == -1)874 def test_urllib2_opener_handlers(self):875 ctx = SSL.Context()876 from M2Crypto import m2urllib2877 opener = m2urllib2.build_opener(ctx,878 m2urllib2.HTTPBasicAuthHandler())879 def test_urllib2_leak(self):880 pid = self.start_server(self.args)881 try:882 import gc883 from M2Crypto import m2urllib2884 o = m2urllib2.build_opener()885 r = o.open('https://%s:%s/' % (srv_host, srv_port))886 s = [r.fp._sock.fp]887 r.close()888 self.assertEqual(len(gc.get_referrers(s[0])), 1)889 finally:890 self.stop_server(pid)891class TwistedSSLClientTestCase(BaseSSLClientTestCase):892 def test_twisted_wrapper(self):893 # Test only when twisted and ZopeInterfaces are present894 try:895 from twisted.internet.protocol import ClientFactory896 from twisted.protocols.basic import LineReceiver897 from twisted.internet import reactor898 import M2Crypto.SSL.TwistedProtocolWrapper as wrapper899 except ImportError:900 import warnings901 warnings.warn('Skipping twisted wrapper test because twisted not found')902 return903 904 class EchoClient(LineReceiver):905 def connectionMade(self):906 self.sendLine('GET / HTTP/1.0\n\n')907 def lineReceived(self, line):908 global twisted_data909 twisted_data += line910 class EchoClientFactory(ClientFactory):911 protocol = EchoClient912 913 def clientConnectionFailed(self, connector, reason):914 reactor.stop()915 assert 0, reason916 917 def clientConnectionLost(self, connector, reason):918 reactor.stop()919 920 pid = self.start_server(self.args)921 class ContextFactory:922 def getContext(self):923 return SSL.Context()924 try:925 global twisted_data926 twisted_data = ''927 928 contextFactory = ContextFactory()929 factory = EchoClientFactory()930 wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)931 reactor.run() # This will block until reactor.stop() is called932 finally:933 self.stop_server(pid)934 self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)...
test_ssl.py.svn-base
Source:test_ssl.py.svn-base
...58 pass59 return False60class BaseSSLClientTestCase(unittest.TestCase):61 openssl_in_path = find_openssl()62 def start_server(self, args):63 if not self.openssl_in_path:64 raise Exception('openssl command not in PATH')65 66 pid = os.fork()67 if pid == 0:68 # openssl must be started in the tests directory for it69 # to find the .pem files70 os.chdir('tests')71 try:72 os.execvp('openssl', args)73 finally:74 os.chdir('..')75 76 else:77 time.sleep(sleepTime)78 return pid79 def stop_server(self, pid):80 os.kill(pid, 1)81 os.waitpid(pid, 0)82 def http_get(self, s):83 s.send('GET / HTTP/1.0\n\n') 84 resp = ''85 while 1:86 try:87 r = s.recv(4096)88 if not r:89 break90 except SSL.SSLError: # s_server throws an 'unexpected eof'...91 break92 resp = resp + r 93 return resp94 def setUp(self):95 self.srv_host = srv_host96 self.srv_port = srv_port97 self.srv_addr = (srv_host, srv_port)98 self.srv_url = 'https://%s:%s/' % (srv_host, srv_port)99 self.args = ['s_server', '-quiet', '-www',100 #'-cert', 'server.pem', Implicitly using this101 '-accept', str(self.srv_port)]102 def tearDown(self):103 global srv_port104 srv_port = srv_port - 1105class PassSSLClientTestCase(BaseSSLClientTestCase):106 107 def test_pass(self):108 pass109class HttpslibSSLClientTestCase(BaseSSLClientTestCase):110 def test_HTTPSConnection(self):111 pid = self.start_server(self.args)112 try:113 from M2Crypto import httpslib114 c = httpslib.HTTPSConnection(srv_host, srv_port)115 c.request('GET', '/')116 data = c.getresponse().read()117 c.close()118 finally:119 self.stop_server(pid)120 self.failIf(string.find(data, 's_server -quiet -www') == -1)121 def test_HTTPSConnection_resume_session(self):122 pid = self.start_server(self.args)123 try:124 from M2Crypto import httpslib125 ctx = SSL.Context()126 ctx.load_verify_locations(cafile='tests/ca.pem')127 ctx.load_cert('tests/x509.pem')128 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)129 ctx.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)130 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)131 c.request('GET', '/')132 ses = c.get_session()133 t = ses.as_text()134 data = c.getresponse().read()135 # Appearently closing connection here screws session; Ali Polatel?136 # c.close()137 138 ctx2 = SSL.Context()139 ctx2.load_verify_locations(cafile='tests/ca.pem')140 ctx2.load_cert('tests/x509.pem')141 ctx2.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 1)142 ctx2.set_session_cache_mode(m2.SSL_SESS_CACHE_CLIENT)143 c2 = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx2)144 c2.set_session(ses)145 c2.request('GET', '/')146 ses2 = c2.get_session()147 t2 = ses2.as_text()148 data = c2.getresponse().read()149 c.close()150 c2.close()151 assert t == t2, "Sessions did not match"152 finally:153 self.stop_server(pid)154 self.failIf(string.find(data, 's_server -quiet -www') == -1)155 def test_HTTPSConnection_secure_context(self):156 pid = self.start_server(self.args)157 try:158 from M2Crypto import httpslib159 ctx = SSL.Context()160 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)161 ctx.load_verify_locations('tests/ca.pem')162 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)163 c.request('GET', '/')164 data = c.getresponse().read()165 c.close()166 finally:167 self.stop_server(pid)168 self.failIf(string.find(data, 's_server -quiet -www') == -1)169 def test_HTTPSConnection_secure_context_fail(self):170 pid = self.start_server(self.args)171 try:172 from M2Crypto import httpslib173 ctx = SSL.Context()174 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)175 ctx.load_verify_locations('tests/server.pem')176 c = httpslib.HTTPSConnection(srv_host, srv_port, ssl_context=ctx)177 self.assertRaises(SSL.SSLError, c.request, 'GET', '/')178 c.close()179 finally:180 self.stop_server(pid)181 def test_HTTPS(self):182 pid = self.start_server(self.args)183 try:184 from M2Crypto import httpslib185 c = httpslib.HTTPS(srv_host, srv_port)186 c.putrequest('GET', '/')187 c.putheader('Accept', 'text/html')188 c.putheader('Accept', 'text/plain')189 c.endheaders()190 err, msg, headers = c.getreply()191 assert err == 200, err192 f = c.getfile()193 data = f.read()194 c.close()195 finally:196 self.stop_server(pid)197 self.failIf(string.find(data, 's_server -quiet -www') == -1)198 def test_HTTPS_secure_context(self):199 pid = self.start_server(self.args)200 try:201 from M2Crypto import httpslib202 ctx = SSL.Context()203 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)204 ctx.load_verify_locations('tests/ca.pem')205 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)206 c.putrequest('GET', '/')207 c.putheader('Accept', 'text/html')208 c.putheader('Accept', 'text/plain')209 c.endheaders()210 err, msg, headers = c.getreply()211 assert err == 200, err212 f = c.getfile()213 data = f.read()214 c.close()215 finally:216 self.stop_server(pid)217 self.failIf(string.find(data, 's_server -quiet -www') == -1)218 def test_HTTPS_secure_context_fail(self):219 pid = self.start_server(self.args)220 try:221 from M2Crypto import httpslib222 ctx = SSL.Context()223 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)224 ctx.load_verify_locations('tests/server.pem')225 c = httpslib.HTTPS(srv_host, srv_port, ssl_context=ctx)226 c.putrequest('GET', '/')227 c.putheader('Accept', 'text/html')228 c.putheader('Accept', 'text/plain')229 self.assertRaises(SSL.SSLError, c.endheaders)230 c.close()231 finally:232 self.stop_server(pid)233 234 def test_HTTPSConnection_illegalkeywordarg(self):235 from M2Crypto import httpslib236 self.assertRaises(ValueError, httpslib.HTTPSConnection, 'example.org',237 badKeyword=True)238class MiscSSLClientTestCase(BaseSSLClientTestCase):239 def test_no_connection(self):240 ctx = SSL.Context()241 s = SSL.Connection(ctx)242 243 def test_server_simple(self):244 pid = self.start_server(self.args)245 try:246 self.assertRaises(ValueError, SSL.Context, 'tlsv5')247 ctx = SSL.Context()248 s = SSL.Connection(ctx)249 s.connect(self.srv_addr)250 self.assertRaises(ValueError, s.read, 0)251 data = self.http_get(s)252 s.close()253 finally:254 self.stop_server(pid)255 self.failIf(string.find(data, 's_server -quiet -www') == -1)256 def test_server_simple_secure_context(self):257 pid = self.start_server(self.args)258 try:259 ctx = SSL.Context()260 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)261 ctx.load_verify_locations('tests/ca.pem')262 s = SSL.Connection(ctx)263 s.connect(self.srv_addr)264 data = self.http_get(s)265 s.close()266 finally:267 self.stop_server(pid)268 self.failIf(string.find(data, 's_server -quiet -www') == -1)269 def test_server_simple_secure_context_fail(self):270 pid = self.start_server(self.args)271 try:272 ctx = SSL.Context()273 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)274 ctx.load_verify_locations('tests/server.pem')275 s = SSL.Connection(ctx)276 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)277 s.close()278 finally:279 self.stop_server(pid)280 def test_server_simple_timeouts(self):281 pid = self.start_server(self.args)282 try:283 self.assertRaises(ValueError, SSL.Context, 'tlsv5')284 ctx = SSL.Context()285 s = SSL.Connection(ctx)286 287 r = s.get_socket_read_timeout()288 w = s.get_socket_write_timeout()289 assert r.sec == 0, r.sec290 assert r.microsec == 0, r.microsec291 assert w.sec == 0, w.sec292 assert w.microsec == 0, w.microsec293 s.set_socket_read_timeout(SSL.timeout())294 s.set_socket_write_timeout(SSL.timeout(909,9))295 r = s.get_socket_read_timeout()296 w = s.get_socket_write_timeout()297 assert r.sec == 600, r.sec298 assert r.microsec == 0, r.microsec299 assert w.sec == 909, w.sec300 #assert w.microsec == 9, w.microsec XXX 4000301 302 s.connect(self.srv_addr)303 data = self.http_get(s)304 s.close()305 finally:306 self.stop_server(pid)307 self.failIf(string.find(data, 's_server -quiet -www') == -1)308 def test_tls1_nok(self):309 if fips_mode: # TLS is required in FIPS mode310 return311 self.args.append('-no_tls1')312 pid = self.start_server(self.args)313 try:314 ctx = SSL.Context('tlsv1')315 s = SSL.Connection(ctx)316 try:317 s.connect(self.srv_addr)318 except SSL.SSLError, e:319 self.failUnlessEqual(e[0], 'wrong version number')320 s.close()321 finally:322 self.stop_server(pid)323 def test_tls1_ok(self):324 self.args.append('-tls1')325 pid = self.start_server(self.args)326 try:327 ctx = SSL.Context('tlsv1')328 s = SSL.Connection(ctx)329 s.connect(self.srv_addr)330 data = self.http_get(s)331 s.close()332 finally:333 self.stop_server(pid)334 self.failIf(string.find(data, 's_server -quiet -www') == -1)335 def test_sslv23_no_v2(self):336 if fips_mode: # TLS is required in FIPS mode337 return338 self.args.append('-no_tls1')339 pid = self.start_server(self.args)340 try:341 ctx = SSL.Context('sslv23')342 s = SSL.Connection(ctx)343 s.connect(self.srv_addr)344 self.failUnlessEqual(s.get_version(), 'SSLv3')345 s.close()346 finally:347 self.stop_server(pid)348 def test_sslv23_no_v2_no_service(self):349 if fips_mode: # TLS is required in FIPS mode350 return351 self.args = self.args + ['-no_tls1', '-no_ssl3']352 pid = self.start_server(self.args)353 try:354 ctx = SSL.Context('sslv23')355 s = SSL.Connection(ctx)356 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)357 s.close()358 finally:359 self.stop_server(pid)360 def test_sslv23_weak_crypto(self):361 if fips_mode: # TLS is required in FIPS mode362 return363 self.args = self.args + ['-no_tls1', '-no_ssl3']364 pid = self.start_server(self.args)365 try:366 ctx = SSL.Context('sslv23', weak_crypto=1)367 s = SSL.Connection(ctx)368 if m2.OPENSSL_VERSION_NUMBER < 0x10000000: # SSLv2 ciphers disabled by default in newer OpenSSL369 s.connect(self.srv_addr)370 self.failUnlessEqual(s.get_version(), 'SSLv2')371 else:372 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)373 s.close()374 finally:375 self.stop_server(pid)376 def test_cipher_mismatch(self):377 self.args = self.args + ['-cipher', 'AES256-SHA']378 pid = self.start_server(self.args)379 try:380 ctx = SSL.Context()381 s = SSL.Connection(ctx)382 s.set_cipher_list('AES128-SHA')383 try:384 s.connect(self.srv_addr)385 except SSL.SSLError, e:386 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')387 s.close()388 finally:389 self.stop_server(pid)390 391 def test_no_such_cipher(self):392 self.args = self.args + ['-cipher', 'AES128-SHA']393 pid = self.start_server(self.args)394 try:395 ctx = SSL.Context()396 s = SSL.Connection(ctx)397 s.set_cipher_list('EXP-RC2-MD5')398 try:399 s.connect(self.srv_addr)400 except SSL.SSLError, e:401 self.failUnlessEqual(e[0], 'no ciphers available')402 s.close()403 finally:404 self.stop_server(pid)405 406 def test_no_weak_cipher(self):407 if fips_mode: # Weak ciphers are prohibited408 return409 self.args = self.args + ['-cipher', 'EXP']410 pid = self.start_server(self.args)411 try:412 ctx = SSL.Context()413 s = SSL.Connection(ctx)414 try:415 s.connect(self.srv_addr)416 except SSL.SSLError, e:417 self.failUnlessEqual(e[0], 'sslv3 alert handshake failure')418 s.close()419 finally:420 self.stop_server(pid)421 422 def test_use_weak_cipher(self):423 if fips_mode: # Weak ciphers are prohibited424 return425 self.args = self.args + ['-cipher', 'EXP']426 pid = self.start_server(self.args)427 try:428 ctx = SSL.Context(weak_crypto=1)429 s = SSL.Connection(ctx)430 s.connect(self.srv_addr)431 data = self.http_get(s)432 s.close()433 finally:434 self.stop_server(pid)435 self.failIf(string.find(data, 's_server -quiet -www') == -1)436 437 def test_cipher_ok(self):438 self.args = self.args + ['-cipher', 'AES128-SHA']439 pid = self.start_server(self.args)440 try:441 ctx = SSL.Context()442 s = SSL.Connection(ctx)443 s.set_cipher_list('AES128-SHA')444 s.connect(self.srv_addr)445 data = self.http_get(s)446 447 assert s.get_cipher().name() == 'AES128-SHA', s.get_cipher().name()448 449 cipher_stack = s.get_ciphers()450 assert cipher_stack[0].name() == 'AES128-SHA', cipher_stack[0].name()451 self.assertRaises(IndexError, cipher_stack.__getitem__, 2)452 # For some reason there are 2 entries in the stack453 #assert len(cipher_stack) == 1, len(cipher_stack)454 assert s.get_cipher_list() == 'AES128-SHA', s.get_cipher_list()455 456 # Test Cipher_Stack iterator457 i = 0458 for cipher in cipher_stack:459 i += 1460 assert cipher.name() == 'AES128-SHA', '"%s"' % cipher.name()461 self.assertEqual('AES128-SHA-128', str(cipher))462 # For some reason there are 2 entries in the stack463 #assert i == 1, i464 self.assertEqual(i, len(cipher_stack))465 466 s.close()467 finally:468 self.stop_server(pid)469 self.failIf(string.find(data, 's_server -quiet -www') == -1)470 471 def verify_cb_new(self, ok, store):472 return verify_cb_new_function(ok, store)473 def test_verify_cb_new(self):474 pid = self.start_server(self.args)475 try:476 ctx = SSL.Context()477 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,478 self.verify_cb_new)479 s = SSL.Connection(ctx)480 try:481 s.connect(self.srv_addr)482 except SSL.SSLError, e:483 assert 0, e484 data = self.http_get(s)485 s.close()486 finally:487 self.stop_server(pid)488 self.failIf(string.find(data, 's_server -quiet -www') == -1)489 def test_verify_cb_new_class(self):490 pid = self.start_server(self.args)491 try:492 ctx = SSL.Context()493 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,494 VerifyCB())495 s = SSL.Connection(ctx)496 try:497 s.connect(self.srv_addr)498 except SSL.SSLError, e:499 assert 0, e500 data = self.http_get(s)501 s.close()502 finally:503 self.stop_server(pid)504 self.failIf(string.find(data, 's_server -quiet -www') == -1)505 def test_verify_cb_new_function(self):506 pid = self.start_server(self.args)507 try:508 ctx = SSL.Context()509 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,510 verify_cb_new_function)511 s = SSL.Connection(ctx)512 try:513 s.connect(self.srv_addr)514 except SSL.SSLError, e:515 assert 0, e516 data = self.http_get(s)517 s.close()518 finally:519 self.stop_server(pid)520 self.failIf(string.find(data, 's_server -quiet -www') == -1)521 def test_verify_cb_lambda(self):522 pid = self.start_server(self.args)523 try:524 ctx = SSL.Context()525 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,526 lambda ok, store: 1)527 s = SSL.Connection(ctx)528 try:529 s.connect(self.srv_addr)530 except SSL.SSLError, e:531 assert 0, e532 data = self.http_get(s)533 s.close()534 finally:535 self.stop_server(pid)536 self.failIf(string.find(data, 's_server -quiet -www') == -1)537 def verify_cb_exception(self, ok, store):538 raise Exception, 'We should fail verification'539 def test_verify_cb_exception(self):540 pid = self.start_server(self.args)541 try:542 ctx = SSL.Context()543 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,544 self.verify_cb_exception)545 s = SSL.Connection(ctx)546 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)547 s.close()548 finally:549 self.stop_server(pid)550 def test_verify_cb_not_callable(self):551 ctx = SSL.Context()552 self.assertRaises(TypeError,553 ctx.set_verify,554 SSL.verify_peer | SSL.verify_fail_if_no_peer_cert,555 9,556 1)557 def test_verify_cb_wrong_callable(self):558 pid = self.start_server(self.args)559 try:560 ctx = SSL.Context()561 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,562 lambda _: '')563 s = SSL.Connection(ctx)564 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)565 s.close()566 finally:567 self.stop_server(pid)568 def verify_cb_old(self, ctx_ptr, x509_ptr, err, depth, ok):569 try:570 from M2Crypto import X509571 assert not ok572 assert err == m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT or \573 err == m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY or \574 err == m2.X509_V_ERR_CERT_UNTRUSTED or \575 err == m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE576 assert m2.ssl_ctx_get_cert_store(ctx_ptr)577 assert X509.X509(x509_ptr).as_pem()578 except AssertionError:579 # If we let exceptions propagate from here the580 # caller may see strange errors. This is cleaner.581 return 0582 return 1583 def test_verify_cb_old(self):584 pid = self.start_server(self.args)585 try:586 ctx = SSL.Context()587 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,588 self.verify_cb_old)589 s = SSL.Connection(ctx)590 try:591 s.connect(self.srv_addr)592 except SSL.SSLError, e:593 assert 0, e594 data = self.http_get(s)595 s.close()596 finally:597 self.stop_server(pid)598 self.failIf(string.find(data, 's_server -quiet -www') == -1)599 def test_verify_allow_unknown_old(self):600 pid = self.start_server(self.args)601 try:602 ctx = SSL.Context()603 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,604 SSL.cb.ssl_verify_callback)605 ctx.set_allow_unknown_ca(1)606 s = SSL.Connection(ctx)607 try:608 s.connect(self.srv_addr)609 except SSL.SSLError, e:610 assert 0, e611 data = self.http_get(s)612 s.close()613 finally:614 self.stop_server(pid)615 self.failIf(string.find(data, 's_server -quiet -www') == -1)616 def test_verify_allow_unknown_new(self):617 pid = self.start_server(self.args)618 try:619 ctx = SSL.Context()620 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9,621 SSL.cb.ssl_verify_callback_allow_unknown_ca)622 s = SSL.Connection(ctx)623 try:624 s.connect(self.srv_addr)625 except SSL.SSLError, e:626 assert 0, e627 data = self.http_get(s)628 s.close()629 finally:630 self.stop_server(pid)631 self.failIf(string.find(data, 's_server -quiet -www') == -1)632 def test_verify_cert(self):633 pid = self.start_server(self.args)634 try:635 ctx = SSL.Context()636 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)637 ctx.load_verify_locations('tests/ca.pem')638 s = SSL.Connection(ctx)639 try:640 s.connect(self.srv_addr)641 except SSL.SSLError, e:642 assert 0, e643 data = self.http_get(s)644 s.close()645 finally:646 self.stop_server(pid)647 self.failIf(string.find(data, 's_server -quiet -www') == -1)648 def test_verify_cert_fail(self):649 pid = self.start_server(self.args)650 try:651 ctx = SSL.Context()652 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)653 ctx.load_verify_locations('tests/server.pem')654 s = SSL.Connection(ctx)655 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)656 s.close()657 finally:658 self.stop_server(pid)659 def test_verify_cert_mutual_auth(self):660 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 661 pid = self.start_server(self.args)662 try:663 ctx = SSL.Context()664 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)665 ctx.load_verify_locations('tests/ca.pem')666 ctx.load_cert('tests/x509.pem')667 s = SSL.Connection(ctx)668 try:669 s.connect(self.srv_addr)670 except SSL.SSLError, e:671 assert 0, e672 data = self.http_get(s)673 s.close()674 finally:675 self.stop_server(pid)676 self.failIf(string.find(data, 's_server -quiet -www') == -1)677 def test_verify_cert_mutual_auth_servernbio(self):678 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem', '-nbio'])679 pid = self.start_server(self.args)680 try:681 ctx = SSL.Context()682 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)683 ctx.load_verify_locations('tests/ca.pem')684 ctx.load_cert('tests/x509.pem')685 s = SSL.Connection(ctx)686 try:687 s.connect(self.srv_addr)688 except SSL.SSLError, e:689 assert 0, e690 data = self.http_get(s)691 s.close()692 finally:693 self.stop_server(pid)694 self.failIf(string.find(data, 's_server -quiet -www') == -1)695 def test_verify_cert_mutual_auth_fail(self):696 self.args.extend(['-Verify', '2', '-CAfile', 'ca.pem']) 697 pid = self.start_server(self.args)698 try:699 ctx = SSL.Context()700 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)701 ctx.load_verify_locations('tests/ca.pem')702 s = SSL.Connection(ctx)703 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)704 s.close()705 finally:706 self.stop_server(pid)707 def test_verify_nocert_fail(self):708 self.args.extend(['-nocert']) 709 pid = self.start_server(self.args)710 try:711 ctx = SSL.Context()712 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)713 ctx.load_verify_locations('tests/ca.pem')714 s = SSL.Connection(ctx)715 self.assertRaises(SSL.SSLError, s.connect, self.srv_addr)716 s.close()717 finally:718 self.stop_server(pid)719 def test_blocking0(self):720 pid = self.start_server(self.args)721 try:722 ctx = SSL.Context()723 s = SSL.Connection(ctx)724 s.setblocking(0)725 self.assertRaises(Exception, s.connect, self.srv_addr)726 s.close()727 finally:728 self.stop_server(pid)729 def test_blocking1(self):730 pid = self.start_server(self.args)731 try:732 ctx = SSL.Context()733 s = SSL.Connection(ctx)734 s.setblocking(1)735 try:736 s.connect(self.srv_addr)737 except SSL.SSLError, e:738 assert 0, e739 data = self.http_get(s)740 s.close()741 finally:742 self.stop_server(pid)743 self.failIf(string.find(data, 's_server -quiet -www') == -1)744 def test_makefile(self):745 pid = self.start_server(self.args)746 try:747 ctx = SSL.Context()748 s = SSL.Connection(ctx)749 try:750 s.connect(self.srv_addr)751 except SSL.SSLError, e:752 assert 0, e753 bio = s.makefile('rw')754 #s.close() # XXX bug 6628?755 bio.write('GET / HTTP/1.0\n\n')756 bio.flush()757 data = bio.read()758 bio.close()759 s.close()760 finally:761 self.stop_server(pid)762 self.failIf(string.find(data, 's_server -quiet -www') == -1)763 def test_makefile_err(self):764 pid = self.start_server(self.args)765 try:766 ctx = SSL.Context()767 s = SSL.Connection(ctx)768 try:769 s.connect(self.srv_addr)770 except SSL.SSLError, e:771 assert 0, e772 f = s.makefile()773 data = self.http_get(s)774 s.close()775 del f776 del s777 err_code = Err.peek_error_code()778 assert not err_code, 'Unexpected error: %s' % err_code779 err = Err.get_error()780 assert not err, 'Unexpected error: %s' % err781 finally:782 self.stop_server(pid)783 self.failIf(string.find(data, 's_server -quiet -www') == -1)784 def test_info_callback(self):785 pid = self.start_server(self.args)786 try:787 ctx = SSL.Context()788 ctx.set_info_callback()789 s = SSL.Connection(ctx)790 s.connect(self.srv_addr)791 data = self.http_get(s)792 s.close()793 finally:794 self.stop_server(pid)795 self.failIf(string.find(data, 's_server -quiet -www') == -1)796class UrllibSSLClientTestCase(BaseSSLClientTestCase):797 def test_urllib(self):798 pid = self.start_server(self.args)799 try:800 from M2Crypto import m2urllib801 url = m2urllib.FancyURLopener()802 url.addheader('Connection', 'close')803 u = url.open('https://%s:%s/' % (srv_host, srv_port))804 data = u.read()805 u.close()806 finally:807 self.stop_server(pid)808 self.failIf(string.find(data, 's_server -quiet -www') == -1)809 # XXX Don't actually know how to use m2urllib safely!810 #def test_urllib_secure_context(self):811 #def test_urllib_secure_context_fail(self):812 # XXX Don't actually know how to use m2urllib safely!813 #def test_urllib_safe_context(self):814 #def test_urllib_safe_context_fail(self):815class Urllib2SSLClientTestCase(BaseSSLClientTestCase):816 if sys.version_info >= (2,4):817 def test_urllib2(self):818 pid = self.start_server(self.args)819 try:820 from M2Crypto import m2urllib2821 opener = m2urllib2.build_opener()822 opener.addheaders = [('Connection', 'close')]823 u = opener.open('https://%s:%s/' % (srv_host, srv_port))824 data = u.read()825 u.close()826 finally:827 self.stop_server(pid)828 self.failIf(string.find(data, 's_server -quiet -www') == -1)829 830 def test_urllib2_secure_context(self):831 pid = self.start_server(self.args)832 try:833 ctx = SSL.Context()834 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)835 ctx.load_verify_locations('tests/ca.pem')836 837 from M2Crypto import m2urllib2838 opener = m2urllib2.build_opener(ctx)839 opener.addheaders = [('Connection', 'close')] 840 u = opener.open('https://%s:%s/' % (srv_host, srv_port))841 data = u.read()842 u.close()843 finally:844 self.stop_server(pid)845 self.failIf(string.find(data, 's_server -quiet -www') == -1)846 847 def test_urllib2_secure_context_fail(self):848 pid = self.start_server(self.args)849 try:850 ctx = SSL.Context()851 ctx.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 9)852 ctx.load_verify_locations('tests/server.pem')853 854 from M2Crypto import m2urllib2855 opener = m2urllib2.build_opener(ctx)856 opener.addheaders = [('Connection', 'close')]857 self.assertRaises(SSL.SSLError, opener.open, 'https://%s:%s/' % (srv_host, srv_port))858 finally:859 self.stop_server(pid)860 def test_z_urllib2_opener(self):861 pid = self.start_server(self.args)862 try:863 ctx = SSL.Context()864 from M2Crypto import m2urllib2865 opener = m2urllib2.build_opener(ctx, m2urllib2.HTTPBasicAuthHandler())866 m2urllib2.install_opener(opener)867 req = m2urllib2.Request('https://%s:%s/' % (srv_host, srv_port))868 u = m2urllib2.urlopen(req)869 data = u.read()870 u.close()871 finally:872 self.stop_server(pid)873 self.failIf(string.find(data, 's_server -quiet -www') == -1)874 def test_urllib2_opener_handlers(self):875 ctx = SSL.Context()876 from M2Crypto import m2urllib2877 opener = m2urllib2.build_opener(ctx,878 m2urllib2.HTTPBasicAuthHandler())879 def test_urllib2_leak(self):880 pid = self.start_server(self.args)881 try:882 import gc883 from M2Crypto import m2urllib2884 o = m2urllib2.build_opener()885 r = o.open('https://%s:%s/' % (srv_host, srv_port))886 s = [r.fp._sock.fp]887 r.close()888 self.assertEqual(len(gc.get_referrers(s[0])), 1)889 finally:890 self.stop_server(pid)891class TwistedSSLClientTestCase(BaseSSLClientTestCase):892 def test_twisted_wrapper(self):893 # Test only when twisted and ZopeInterfaces are present894 try:895 from twisted.internet.protocol import ClientFactory896 from twisted.protocols.basic import LineReceiver897 from twisted.internet import reactor898 import M2Crypto.SSL.TwistedProtocolWrapper as wrapper899 except ImportError:900 import warnings901 warnings.warn('Skipping twisted wrapper test because twisted not found')902 return903 904 class EchoClient(LineReceiver):905 def connectionMade(self):906 self.sendLine('GET / HTTP/1.0\n\n')907 def lineReceived(self, line):908 global twisted_data909 twisted_data += line910 class EchoClientFactory(ClientFactory):911 protocol = EchoClient912 913 def clientConnectionFailed(self, connector, reason):914 reactor.stop()915 assert 0, reason916 917 def clientConnectionLost(self, connector, reason):918 reactor.stop()919 920 pid = self.start_server(self.args)921 class ContextFactory:922 def getContext(self):923 return SSL.Context()924 try:925 global twisted_data926 twisted_data = ''927 928 contextFactory = ContextFactory()929 factory = EchoClientFactory()930 wrapper.connectSSL(srv_host, srv_port, factory, contextFactory)931 reactor.run() # This will block until reactor.stop() is called932 finally:933 self.stop_server(pid)934 self.failIf(string.find(twisted_data, 's_server -quiet -www') == -1)...
test_debug_server.py
Source:test_debug_server.py
...20 return killed21def setup_server(start_server, find_free_port, no_eval=False, stdout=False, env=None):22 port = find_free_port()23 args = ["+DEBUG_LOG", "+NO_EVAL"] if no_eval else ["+DEBUG_LOG"]24 s = start_server(port, "test_debug_server", args, stdout=stdout, env=env)25 assert s.poll() is None26 uri = "ws://localhost:{0}".format(port)27 return s, uri28def test_continue_stop(start_server, find_free_port):29 s, uri = setup_server(start_server, find_free_port, True)30 async def test_logic():31 client = hgdb.HGDBClient(uri, None)32 await client.connect()33 await client.continue_()34 await client.stop()35 asyncio.get_event_loop().run_until_complete(test_logic())36 # check if process exit37 killed = is_killed(s)38 if not killed:...
test_client_server.py
Source:test_client_server.py
...36 # Process callbacks scheduled with call_soon by appending a callback37 # to stop the event loop then running it until it hits that callback.38 self.loop.call_soon(self.loop.stop)39 self.loop.run_forever()40 def start_server(self, **kwds):41 server = serve(handler, 'localhost', 8642, **kwds)42 self.server = self.loop.run_until_complete(server)43 def start_client(self, path='', **kwds):44 client = connect('ws://localhost:8642/' + path, **kwds)45 self.client = self.loop.run_until_complete(client)46 def stop_client(self):47 self.loop.run_until_complete(self.client.worker)48 def stop_server(self):49 self.server.close()50 self.loop.run_until_complete(self.server.wait_closed())51 def test_basic(self):52 self.start_server()53 self.start_client()54 self.loop.run_until_complete(self.client.send("Hello!"))55 reply = self.loop.run_until_complete(self.client.recv())56 self.assertEqual(reply, "Hello!")57 self.stop_client()58 self.stop_server()59 def test_server_close_while_client_connected(self):60 self.start_server()61 self.start_client()62 self.stop_server()63 def test_explicit_event_loop(self):64 self.start_server(loop=self.loop)65 self.start_client(loop=self.loop)66 self.loop.run_until_complete(self.client.send("Hello!"))67 reply = self.loop.run_until_complete(self.client.recv())68 self.assertEqual(reply, "Hello!")69 self.stop_client()70 self.stop_server()71 def test_protocol_attributes(self):72 self.start_server()73 self.start_client('attributes')74 expected_attrs = ('localhost', 8642, self.secure)75 client_attrs = (self.client.host, self.client.port, self.client.secure)76 self.assertEqual(client_attrs, expected_attrs)77 server_attrs = self.loop.run_until_complete(self.client.recv())78 self.assertEqual(server_attrs, repr(expected_attrs))79 self.stop_client()80 self.stop_server()81 def test_protocol_headers(self):82 self.start_server()83 self.start_client('headers')84 client_req = self.client.request_headers85 client_resp = self.client.response_headers86 self.assertEqual(client_req['User-Agent'], USER_AGENT)87 self.assertEqual(client_resp['Server'], USER_AGENT)88 server_req = self.loop.run_until_complete(self.client.recv())89 server_resp = self.loop.run_until_complete(self.client.recv())90 self.assertEqual(server_req, str(client_req))91 self.assertEqual(server_resp, str(client_resp))92 self.stop_client()93 self.stop_server()94 def test_protocol_raw_headers(self):95 self.start_server()96 self.start_client('raw_headers')97 client_req = self.client.raw_request_headers98 client_resp = self.client.raw_response_headers99 self.assertEqual(dict(client_req)['User-Agent'], USER_AGENT)100 self.assertEqual(dict(client_resp)['Server'], USER_AGENT)101 server_req = self.loop.run_until_complete(self.client.recv())102 server_resp = self.loop.run_until_complete(self.client.recv())103 self.assertEqual(server_req, repr(client_req))104 self.assertEqual(server_resp, repr(client_resp))105 self.stop_client()106 self.stop_server()107 def test_protocol_custom_request_headers_dict(self):108 self.start_server()109 self.start_client('raw_headers', extra_headers={'X-Spam': 'Eggs'})110 req_headers = self.loop.run_until_complete(self.client.recv())111 self.loop.run_until_complete(self.client.recv())112 self.assertIn("('X-Spam', 'Eggs')", req_headers)113 self.stop_client()114 self.stop_server()115 def test_protocol_custom_request_headers_list(self):116 self.start_server()117 self.start_client('raw_headers', extra_headers=[('X-Spam', 'Eggs')])118 req_headers = self.loop.run_until_complete(self.client.recv())119 self.loop.run_until_complete(self.client.recv())120 self.assertIn("('X-Spam', 'Eggs')", req_headers)121 self.stop_client()122 self.stop_server()123 def test_protocol_custom_response_headers_callable_dict(self):124 self.start_server(extra_headers=lambda p, r: {'X-Spam': 'Eggs'})125 self.start_client('raw_headers')126 self.loop.run_until_complete(self.client.recv())127 resp_headers = self.loop.run_until_complete(self.client.recv())128 self.assertIn("('X-Spam', 'Eggs')", resp_headers)129 self.stop_client()130 self.stop_server()131 def test_protocol_custom_response_headers_callable_list(self):132 self.start_server(extra_headers=lambda p, r: [('X-Spam', 'Eggs')])133 self.start_client('raw_headers')134 self.loop.run_until_complete(self.client.recv())135 resp_headers = self.loop.run_until_complete(self.client.recv())136 self.assertIn("('X-Spam', 'Eggs')", resp_headers)137 self.stop_client()138 self.stop_server()139 def test_protocol_custom_response_headers_dict(self):140 self.start_server(extra_headers={'X-Spam': 'Eggs'})141 self.start_client('raw_headers')142 self.loop.run_until_complete(self.client.recv())143 resp_headers = self.loop.run_until_complete(self.client.recv())144 self.assertIn("('X-Spam', 'Eggs')", resp_headers)145 self.stop_client()146 self.stop_server()147 def test_protocol_custom_response_headers_list(self):148 self.start_server(extra_headers=[('X-Spam', 'Eggs')])149 self.start_client('raw_headers')150 self.loop.run_until_complete(self.client.recv())151 resp_headers = self.loop.run_until_complete(self.client.recv())152 self.assertIn("('X-Spam', 'Eggs')", resp_headers)153 self.stop_client()154 self.stop_server()155 def test_no_subprotocol(self):156 self.start_server()157 self.start_client('subprotocol')158 server_subprotocol = self.loop.run_until_complete(self.client.recv())159 self.assertEqual(server_subprotocol, repr(None))160 self.assertEqual(self.client.subprotocol, None)161 self.stop_client()162 self.stop_server()163 def test_subprotocol_found(self):164 self.start_server(subprotocols=['superchat', 'chat'])165 self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])166 server_subprotocol = self.loop.run_until_complete(self.client.recv())167 self.assertEqual(server_subprotocol, repr('chat'))168 self.assertEqual(self.client.subprotocol, 'chat')169 self.stop_client()170 self.stop_server()171 def test_subprotocol_not_found(self):172 self.start_server(subprotocols=['superchat'])173 self.start_client('subprotocol', subprotocols=['otherchat'])174 server_subprotocol = self.loop.run_until_complete(self.client.recv())175 self.assertEqual(server_subprotocol, repr(None))176 self.assertEqual(self.client.subprotocol, None)177 self.stop_client()178 self.stop_server()179 def test_subprotocol_not_offered(self):180 self.start_server()181 self.start_client('subprotocol', subprotocols=['otherchat', 'chat'])182 server_subprotocol = self.loop.run_until_complete(self.client.recv())183 self.assertEqual(server_subprotocol, repr(None))184 self.assertEqual(self.client.subprotocol, None)185 self.stop_client()186 self.stop_server()187 def test_subprotocol_not_requested(self):188 self.start_server(subprotocols=['superchat', 'chat'])189 self.start_client('subprotocol')190 server_subprotocol = self.loop.run_until_complete(self.client.recv())191 self.assertEqual(server_subprotocol, repr(None))192 self.assertEqual(self.client.subprotocol, None)193 self.stop_client()194 self.stop_server()195 @unittest.mock.patch.object(196 WebSocketServerProtocol, 'select_subprotocol', autospec=True)197 def test_subprotocol_error(self, _select_subprotocol):198 _select_subprotocol.return_value = 'superchat'199 self.start_server(subprotocols=['superchat'])200 with self.assertRaises(InvalidHandshake):201 self.start_client('subprotocol', subprotocols=['otherchat'])202 self.run_loop_once()203 self.stop_server()204 @unittest.mock.patch('websockets.server.read_request')205 def test_server_receives_malformed_request(self, _read_request):206 _read_request.side_effect = ValueError("read_request failed")207 self.start_server()208 with self.assertRaises(InvalidHandshake):209 self.start_client()210 self.stop_server()211 @unittest.mock.patch('websockets.client.read_response')212 def test_client_receives_malformed_response(self, _read_response):213 _read_response.side_effect = ValueError("read_response failed")214 self.start_server()215 with self.assertRaises(InvalidHandshake):216 self.start_client()217 self.run_loop_once()218 self.stop_server()219 @unittest.mock.patch('websockets.client.build_request')220 def test_client_sends_invalid_handshake_request(self, _build_request):221 def wrong_build_request(set_header):222 return '42'223 _build_request.side_effect = wrong_build_request224 self.start_server()225 with self.assertRaises(InvalidHandshake):226 self.start_client()227 self.stop_server()228 @unittest.mock.patch('websockets.server.build_response')229 def test_server_sends_invalid_handshake_response(self, _build_response):230 def wrong_build_response(set_header, key):231 return build_response(set_header, '42')232 _build_response.side_effect = wrong_build_response233 self.start_server()234 with self.assertRaises(InvalidHandshake):235 self.start_client()236 self.stop_server()237 @unittest.mock.patch('websockets.client.read_response')238 def test_server_does_not_switch_protocols(self, _read_response):239 @asyncio.coroutine240 def wrong_read_response(stream):241 code, headers = yield from read_response(stream)242 return 400, headers243 _read_response.side_effect = wrong_read_response244 self.start_server()245 with self.assertRaises(InvalidHandshake):246 self.start_client()247 self.run_loop_once()248 self.stop_server()249 @unittest.mock.patch('websockets.server.WebSocketServerProtocol.send')250 def test_server_handler_crashes(self, send):251 send.side_effect = ValueError("send failed")252 self.start_server()253 self.start_client()254 self.loop.run_until_complete(self.client.send("Hello!"))255 with self.assertRaises(ConnectionClosed):256 self.loop.run_until_complete(self.client.recv())257 self.stop_client()258 self.stop_server()259 # Connection ends with an unexpected error.260 self.assertEqual(self.client.close_code, 1011)261 @unittest.mock.patch('websockets.server.WebSocketServerProtocol.close')262 def test_server_close_crashes(self, close):263 close.side_effect = ValueError("close failed")264 self.start_server()265 self.start_client()266 self.loop.run_until_complete(self.client.send("Hello!"))267 reply = self.loop.run_until_complete(self.client.recv())268 self.assertEqual(reply, "Hello!")269 self.stop_client()270 self.stop_server()271 # Connection ends with an abnormal closure.272 self.assertEqual(self.client.close_code, 1006)273@unittest.skipUnless(os.path.exists(testcert), "test certificate is missing")274class SSLClientServerTests(ClientServerTests):275 secure = True276 @property277 def server_context(self):278 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)279 ssl_context.load_cert_chain(testcert)280 return ssl_context281 @property282 def client_context(self):283 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)284 ssl_context.load_verify_locations(testcert)285 ssl_context.verify_mode = ssl.CERT_REQUIRED286 return ssl_context287 def start_server(self, *args, **kwds):288 kwds['ssl'] = self.server_context289 server = serve(handler, 'localhost', 8642, **kwds)290 self.server = self.loop.run_until_complete(server)291 def start_client(self, path='', **kwds):292 kwds['ssl'] = self.client_context293 client = connect('wss://localhost:8642/' + path, **kwds)294 self.client = self.loop.run_until_complete(client)295 def test_ws_uri_is_rejected(self):296 self.start_server()297 client = connect('ws://localhost:8642/', ssl=self.client_context)298 with self.assertRaises(ValueError):299 self.loop.run_until_complete(client)300 self.stop_server()301class ClientServerOriginTests(unittest.TestCase):302 def setUp(self):303 self.loop = asyncio.new_event_loop()304 asyncio.set_event_loop(self.loop)305 def tearDown(self):306 self.loop.close()307 def test_checking_origin_succeeds(self):308 server = self.loop.run_until_complete(309 serve(handler, 'localhost', 8642, origins=['http://localhost']))310 client = self.loop.run_until_complete(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!