Best Python code snippet using green
tlstest.py
Source:tlstest.py
1#!/usr/bin/env python2# Authors: 3# Trevor Perrin4# Kees Bos - Added tests for XML-RPC5# Dimitris Moraitis - Anon ciphersuites6# Marcelo Fernandez - Added test for NPN7# Martin von Loewis - python 3 port8# Hubert Kario - several improvements9# Google - FALLBACK_SCSV test10# Efthimis Iosifidis - improvemnts of time measurement in Throughput Test11#12#13# See the LICENSE file for legal information regarding use of this file.14from __future__ import print_function15import sys16import os17import os.path18import socket19import time20import timeit21import getopt22from tempfile import mkstemp23try:24 from BaseHTTPServer import HTTPServer25 from SimpleHTTPServer import SimpleHTTPRequestHandler26except ImportError:27 from http.server import HTTPServer, SimpleHTTPRequestHandler28from tlslite import TLSConnection, Fault, HandshakeSettings, \29 X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \30 parsePEMKey, constants, \31 AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \32 POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \33 Checker, __version__34from tlslite.handshakesettings import VirtualHost, Keypair35from tlslite.errors import *36from tlslite.utils.cryptomath import prngName, getRandomBytes37try:38 import xmlrpclib39except ImportError:40 # Python 341 from xmlrpc import client as xmlrpclib42import ssl43from tlslite import *44from tlslite.constants import KeyUpdateMessageType45try:46 from tack.structures.Tack import Tack47 48except ImportError:49 pass50def printUsage(s=None):51 if m2cryptoLoaded:52 crypto = "M2Crypto/OpenSSL"53 else:54 crypto = "Python crypto" 55 if s:56 print("ERROR: %s" % s)57 print("""\ntls.py version %s (using %s) 58Commands:59 server HOST:PORT DIRECTORY60 client HOST:PORT DIRECTORY61""" % (__version__, crypto))62 sys.exit(-1)63 64def testConnClient(conn):65 b1 = os.urandom(1)66 b10 = os.urandom(10)67 b100 = os.urandom(100)68 b1000 = os.urandom(1000)69 conn.write(b1)70 conn.write(b10)71 conn.write(b100)72 conn.write(b1000)73 r1 = conn.read(min=1, max=1)74 assert len(r1) == 175 assert r1 == b176 r10 = conn.read(min=10, max=10)77 assert len(r10) == 1078 assert r10 == b1079 r100 = conn.read(min=100, max=100)80 assert len(r100) == 10081 assert r100 == b10082 r1000 = conn.read(min=1000, max=1000)83 assert len(r1000) == 100084 assert r1000 == b100085def clientTestCmd(argv):86 87 address = argv[0]88 dir = argv[1] 89 #Split address into hostname/port tuple90 address = address.split(":")91 address = ( address[0], int(address[1]) )92 #open synchronisation FIFO93 synchro = socket.socket(socket.AF_INET, socket.SOCK_STREAM)94 synchro.settimeout(60)95 synchro.connect((address[0], address[1]-1))96 def connect():97 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)98 sock.settimeout(15)99 sock.connect(address)100 c = TLSConnection(sock)101 return c102 test_no = 0103 badFault = False104 print("Test {0} - anonymous handshake".format(test_no))105 synchro.recv(1)106 connection = connect()107 settings = HandshakeSettings()108 settings.maxVersion = (3, 3)109 connection.handshakeClientAnonymous(settings=settings)110 testConnClient(connection)111 connection.close()112 test_no += 1113 print("Test {0} - good X.509 (plus SNI)".format(test_no))114 synchro.recv(1)115 connection = connect()116 connection.handshakeClientCert(serverName=address[0])117 testConnClient(connection)118 assert(isinstance(connection.session.serverCertChain, X509CertChain))119 assert(connection.session.serverName == address[0])120 assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)121 assert(connection.encryptThenMAC == False)122 assert connection.session.appProto is None123 connection.close()124 test_no += 1125 print("Test {0} - good X.509 TLSv1.2 (plus ALPN)".format(test_no))126 synchro.recv(1)127 settings = HandshakeSettings()128 settings.maxVersion = (3, 3)129 connection = connect()130 connection.handshakeClientCert(serverName=address[0],131 alpn=[b'http/1.1'],132 settings=settings)133 testConnClient(connection)134 assert isinstance(connection.session.serverCertChain, X509CertChain)135 assert connection.session.serverName == address[0]136 assert connection.session.cipherSuite in constants.CipherSuite.aeadSuites137 assert connection.encryptThenMAC == False138 assert connection.session.appProto == b'http/1.1'139 connection.close()140 test_no += 1141 print("Test {0} - good X.509 TLSv1.3 (plus ALPN)".format(test_no))142 synchro.recv(1)143 connection = connect()144 connection.handshakeClientCert(serverName=address[0],145 alpn=[b'http/1.1'])146 testConnClient(connection)147 assert isinstance(connection.session.serverCertChain, X509CertChain)148 assert connection.session.serverName == address[0]149 assert connection.session.cipherSuite in constants.CipherSuite.aeadSuites150 assert connection.encryptThenMAC == False151 assert connection.session.appProto == b'http/1.1'152 connection.close()153 test_no += 1154 print("Test {0} - good X.509/w RSA-PSS sig".format(test_no))155 synchro.recv(1)156 connection = connect()157 connection.handshakeClientCert(serverName=address[0])158 testConnClient(connection)159 assert(isinstance(connection.session.serverCertChain, X509CertChain))160 assert(connection.session.serverName == address[0])161 assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)162 assert(connection.encryptThenMAC == False)163 connection.close()164 test_no += 1165 print("Test {0} - good X.509/w RSA-PSS cert".format(test_no))166 synchro.recv(1)167 connection = connect()168 connection.handshakeClientCert(serverName=address[0])169 testConnClient(connection)170 assert(isinstance(connection.session.serverCertChain, X509CertChain))171 assert(connection.session.serverName == address[0])172 assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)173 assert(connection.encryptThenMAC == False)174 connection.close()175 test_no += 1176 print("Test {0} - good X.509/w RSA-PSS cert in TLSv1.2".format(test_no))177 synchro.recv(1)178 connection = connect()179 settings = HandshakeSettings()180 settings.minVersion = (3, 3)181 settings.maxVersion = (3, 3)182 connection.handshakeClientCert(serverName=address[0], settings=settings)183 testConnClient(connection)184 assert(isinstance(connection.session.serverCertChain, X509CertChain))185 assert(connection.session.serverName == address[0])186 assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites)187 assert(connection.encryptThenMAC == False)188 connection.close()189 test_no += 1190 print("Test {0} - good X.509, small record_size_limit".format(test_no))191 synchro.recv(1)192 connection = connect()193 settings = HandshakeSettings()194 settings.record_size_limit = 64195 connection.handshakeClientCert(settings=settings)196 testConnClient(connection)197 assert(isinstance(connection.session.serverCertChain, X509CertChain))198 connection.close()199 test_no += 1200 print("Test {0} - good X.509, SSLv3".format(test_no))201 synchro.recv(1)202 connection = connect()203 settings = HandshakeSettings()204 settings.minVersion = (3,0)205 settings.maxVersion = (3,0)206 connection.handshakeClientCert(settings=settings)207 testConnClient(connection) 208 assert(isinstance(connection.session.serverCertChain, X509CertChain))209 connection.close()210 test_no += 1211 print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no))212 synchro.recv(1)213 connection = connect()214 settings = HandshakeSettings()215 settings.minVersion = (3, 0)216 settings.maxVersion = (3, 0)217 connection.handshakeClientCert(settings=settings)218 testConnClient(connection)219 assert connection.session.cipherSuite in\220 constants.CipherSuite.ecdheEcdsaSuites221 assert isinstance(connection.session.serverCertChain, X509CertChain)222 connection.close()223 test_no += 1224 print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no))225 synchro.recv(1)226 connection = connect()227 settings = HandshakeSettings()228 settings.minVersion = (3, 1)229 settings.maxVersion = (3, 1)230 connection.handshakeClientCert(settings=settings)231 testConnClient(connection)232 assert connection.session.cipherSuite in\233 constants.CipherSuite.ecdheEcdsaSuites234 assert isinstance(connection.session.serverCertChain, X509CertChain)235 connection.close()236 test_no += 1237 print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no))238 synchro.recv(1)239 connection = connect()240 settings = HandshakeSettings()241 settings.minVersion = (3, 3)242 settings.maxVersion = (3, 3)243 connection.handshakeClientCert(settings=settings)244 testConnClient(connection)245 assert connection.session.cipherSuite in\246 constants.CipherSuite.ecdheEcdsaSuites247 assert isinstance(connection.session.serverCertChain, X509CertChain)248 connection.close()249 test_no += 1250 print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no))251 synchro.recv(1)252 connection = connect()253 settings = HandshakeSettings()254 settings.minVersion = (3, 3)255 settings.maxVersion = (3, 3)256 settings.eccCurves = ["secp384r1"]257 settings.keyShares = []258 try:259 connection.handshakeClientCert(settings=settings)260 assert False261 except TLSRemoteAlert as e:262 assert "handshake_failure" in str(e)263 connection.close()264 test_no += 1265 for curve, keySize in (("brainpoolP256r1", 256),266 ("brainpoolP384r1", 384),267 ("brainpoolP512r1", 512)):268 print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2".format(test_no, curve))269 synchro.recv(1)270 connection = connect()271 settings = HandshakeSettings()272 settings.minVersion = (3, 3)273 settings.maxVersion = (3, 3)274 settings.eccCurves = [curve]275 settings.keyShares = []276 connection.handshakeClientCert(settings=settings)277 testConnClient(connection)278 assert isinstance(connection.session.serverCertChain, X509CertChain)279 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \280 == keySize281 connection.close()282 test_no += 1283 print("Test {0} - Two good ECDSA certs - secp256r1, TLSv1.2".format(test_no))284 synchro.recv(1)285 connection = connect()286 settings = HandshakeSettings()287 settings.minVersion = (3, 3)288 settings.maxVersion = (3, 3)289 settings.eccCurves = ["secp256r1"]290 settings.keyShares = []291 connection.handshakeClientCert(settings=settings)292 testConnClient(connection)293 assert isinstance(connection.session.serverCertChain, X509CertChain)294 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \295 == 256296 connection.close()297 test_no += 1298 print("Test {0} - Two good ECDSA certs - secp384r1, TLSv1.2".format(test_no))299 synchro.recv(1)300 connection = connect()301 settings = HandshakeSettings()302 settings.minVersion = (3, 3)303 settings.maxVersion = (3, 3)304 settings.eccCurves = ["secp384r1"]305 settings.keyShares = []306 connection.handshakeClientCert(settings=settings)307 testConnClient(connection)308 assert isinstance(connection.session.serverCertChain, X509CertChain)309 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \310 == 384311 connection.close()312 test_no += 1313 print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, TLSv1.2"314 .format(test_no))315 synchro.recv(1)316 connection = connect()317 settings = HandshakeSettings()318 settings.minVersion = (3, 3)319 settings.maxVersion = (3, 3)320 settings.rsaSigHashes = ["sha256"]321 settings.ecdsaSigHashes = ["sha256"]322 connection.handshakeClientCert(settings=settings)323 testConnClient(connection)324 assert isinstance(connection.session.serverCertChain, X509CertChain)325 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\326 == "rsa"327 assert connection.version == (3, 3)328 connection.close()329 test_no += 1330 print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, TLSv1.2"331 .format(test_no))332 synchro.recv(1)333 connection = connect()334 settings = HandshakeSettings()335 settings.minVersion = (3, 3)336 settings.maxVersion = (3, 3)337 settings.rsaSigHashes = ["sha384"]338 settings.ecdsaSigHashes = ["sha256"]339 connection.handshakeClientCert(settings=settings)340 testConnClient(connection)341 assert isinstance(connection.session.serverCertChain, X509CertChain)342 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\343 == "ecdsa"344 assert connection.version == (3, 3)345 connection.close()346 test_no += 1347 print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, TLSv1.2"348 .format(test_no))349 synchro.recv(1)350 connection = connect()351 settings = HandshakeSettings()352 settings.minVersion = (3, 3)353 settings.maxVersion = (3, 3)354 settings.rsaSigHashes = ["sha384"]355 settings.ecdsaSigHashes = ["sha384"]356 connection.handshakeClientCert(settings=settings)357 testConnClient(connection)358 assert isinstance(connection.session.serverCertChain, X509CertChain)359 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\360 == "rsa"361 assert connection.version == (3, 3)362 connection.close()363 test_no += 1364 print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, TLSv1.3"365 .format(test_no))366 synchro.recv(1)367 connection = connect()368 settings = HandshakeSettings()369 settings.minVersion = (3, 4)370 settings.maxVersion = (3, 4)371 settings.rsaSigHashes = ["sha256"]372 settings.ecdsaSigHashes = ["sha256"]373 connection.handshakeClientCert(settings=settings)374 testConnClient(connection)375 assert isinstance(connection.session.serverCertChain, X509CertChain)376 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\377 == "rsa"378 assert connection.version == (3, 4)379 connection.close()380 test_no += 1381 print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, TLSv1.3"382 .format(test_no))383 synchro.recv(1)384 connection = connect()385 settings = HandshakeSettings()386 settings.minVersion = (3, 4)387 settings.maxVersion = (3, 4)388 settings.rsaSigHashes = ["sha384"]389 settings.ecdsaSigHashes = ["sha256"]390 connection.handshakeClientCert(settings=settings)391 testConnClient(connection)392 assert isinstance(connection.session.serverCertChain, X509CertChain)393 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\394 == "ecdsa"395 assert connection.version == (3, 4)396 connection.close()397 test_no += 1398 print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, TLSv1.3"399 .format(test_no))400 synchro.recv(1)401 connection = connect()402 settings = HandshakeSettings()403 settings.minVersion = (3, 4)404 settings.maxVersion = (3, 4)405 settings.rsaSigHashes = ["sha384"]406 settings.ecdsaSigHashes = ["sha384"]407 connection.handshakeClientCert(settings=settings)408 testConnClient(connection)409 assert isinstance(connection.session.serverCertChain, X509CertChain)410 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\411 == "rsa"412 assert connection.version == (3, 4)413 connection.close()414 test_no += 1415 print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no))416 synchro.recv(1)417 connection = connect()418 settings = HandshakeSettings()419 settings.minVersion = (3, 4)420 settings.maxVersion = (3, 4)421 connection.handshakeClientCert(settings=settings)422 testConnClient(connection)423 assert connection.session.cipherSuite in\424 constants.CipherSuite.tls13Suites425 assert isinstance(connection.session.serverCertChain, X509CertChain)426 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \427 == 256428 connection.close()429 test_no += 1430 print("Test {0} - mismatched ECDSA curve, TLSv1.3".format(test_no))431 synchro.recv(1)432 connection = connect()433 settings = HandshakeSettings()434 settings.minVersion = (3, 4)435 settings.maxVersion = (3, 4)436 settings.ecdsaSigHashes = ["sha384", "sha512"]437 try:438 connection.handshakeClientCert(settings=settings)439 assert False440 except TLSRemoteAlert as e:441 assert "handshake_failure" in str(e)442 connection.close()443 test_no += 1444 print("Test {0} - good X.509 P-384 ECDSA, TLSv1.3".format(test_no))445 synchro.recv(1)446 connection = connect()447 settings = HandshakeSettings()448 settings.minVersion = (3, 4)449 settings.maxVersion = (3, 4)450 connection.handshakeClientCert(settings=settings)451 testConnClient(connection)452 assert connection.session.cipherSuite in\453 constants.CipherSuite.tls13Suites454 assert isinstance(connection.session.serverCertChain, X509CertChain)455 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \456 == 384457 connection.close()458 test_no += 1459 print("Test {0} - good X.509 P-521 ECDSA, TLSv1.3".format(test_no))460 synchro.recv(1)461 connection = connect()462 settings = HandshakeSettings()463 settings.minVersion = (3, 4)464 settings.maxVersion = (3, 4)465 connection.handshakeClientCert(settings=settings)466 testConnClient(connection)467 assert connection.session.cipherSuite in\468 constants.CipherSuite.tls13Suites469 assert isinstance(connection.session.serverCertChain, X509CertChain)470 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \471 == 521472 connection.close()473 test_no += 1474 print("Test {0} - good X.509 Ed25519, TLSv1.3".format(test_no))475 synchro.recv(1)476 connection = connect()477 settings = HandshakeSettings()478 settings.minVersion = (3, 4)479 settings.maxVersion = (3, 4)480 connection.handshakeClientCert(settings=settings)481 testConnClient(connection)482 assert connection.session.cipherSuite in\483 constants.CipherSuite.tls13Suites484 assert isinstance(connection.session.serverCertChain, X509CertChain)485 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \486 == "Ed25519"487 connection.close()488 test_no += 1489 print("Test {0} - good X.509 Ed448, TLSv1.3".format(test_no))490 synchro.recv(1)491 connection = connect()492 settings = HandshakeSettings()493 settings.minVersion = (3, 4)494 settings.maxVersion = (3, 4)495 connection.handshakeClientCert(settings=settings)496 testConnClient(connection)497 assert connection.session.cipherSuite in\498 constants.CipherSuite.tls13Suites499 assert isinstance(connection.session.serverCertChain, X509CertChain)500 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \501 == "Ed448"502 connection.close()503 test_no += 1504 print("Test {0} - good RSA and ECDSA, TLSv1.3, rsa"505 .format(test_no))506 synchro.recv(1)507 connection = connect()508 settings = HandshakeSettings()509 settings.minVersion = (3, 4)510 settings.maxVersion = (3, 4)511 connection.handshakeClientCert(settings=settings)512 testConnClient(connection)513 assert connection.session.cipherSuite in\514 constants.CipherSuite.tls13Suites515 assert isinstance(connection.session.serverCertChain, X509CertChain)516 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\517 == "rsa"518 assert connection.version == (3, 4)519 connection.close()520 test_no += 1521 print("Test {0} - good RSA and ECDSA, TLSv1.3, ecdsa"522 .format(test_no))523 synchro.recv(1)524 connection = connect()525 settings = HandshakeSettings()526 settings.minVersion = (3, 4)527 settings.maxVersion = (3, 4)528 settings.rsaSigHashes = []529 connection.handshakeClientCert(settings=settings)530 testConnClient(connection)531 assert connection.session.cipherSuite in\532 constants.CipherSuite.tls13Suites533 assert isinstance(connection.session.serverCertChain, X509CertChain)534 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\535 == "ecdsa"536 assert connection.version == (3, 4)537 connection.close()538 test_no += 1539 print("Test {0} - good RSA and ECDSA, TLSv1.2, rsa"540 .format(test_no))541 synchro.recv(1)542 connection = connect()543 settings = HandshakeSettings()544 settings.minVersion = (3, 3)545 settings.maxVersion = (3, 3)546 connection.handshakeClientCert(settings=settings)547 testConnClient(connection)548 assert connection.session.cipherSuite in\549 constants.CipherSuite.ecdheCertSuites, connection.session.cipherSuite550 assert isinstance(connection.session.serverCertChain, X509CertChain)551 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\552 == "rsa"553 assert connection.version == (3, 3)554 connection.close()555 test_no += 1556 print("Test {0} - good RSA and ECDSA, TLSv1.2, ecdsa"557 .format(test_no))558 synchro.recv(1)559 connection = connect()560 settings = HandshakeSettings()561 settings.minVersion = (3, 3)562 settings.maxVersion = (3, 3)563 settings.rsaSigHashes = []564 connection.handshakeClientCert(settings=settings)565 testConnClient(connection)566 assert connection.session.cipherSuite in\567 constants.CipherSuite.ecdheEcdsaSuites, connection.session.cipherSuite568 assert isinstance(connection.session.serverCertChain, X509CertChain)569 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\570 == "ecdsa"571 assert connection.version == (3, 3)572 connection.close()573 test_no += 1574 print("Test {0} - good X.509, mismatched key_share".format(test_no))575 synchro.recv(1)576 connection = connect()577 settings = HandshakeSettings()578 settings.keyShares = ["x25519"]579 connection.handshakeClientCert(settings=settings)580 testConnClient(connection)581 assert(isinstance(connection.session.serverCertChain, X509CertChain))582 connection.close()583 test_no += 1584 print("Test {0} - good X.509, RC4-MD5".format(test_no))585 synchro.recv(1)586 connection = connect()587 settings = HandshakeSettings()588 settings.macNames = ["md5"]589 settings.cipherNames = ["rc4"]590 settings.maxVersion = (3, 3)591 connection.handshakeClientCert(settings=settings)592 testConnClient(connection) 593 assert(isinstance(connection.session.serverCertChain, X509CertChain))594 assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5)595 assert(connection.encryptThenMAC == False)596 connection.close()597 if tackpyLoaded:598 settings = HandshakeSettings()599 settings.useExperimentalTackExtension = True600 settings.maxVersion = (3, 3)601 test_no += 1602 print("Test {0} - good X.509, TACK".format(test_no))603 synchro.recv(1)604 connection = connect()605 connection.handshakeClientCert(settings=settings)606 assert(connection.session.tackExt.tacks[0].getTackId() == "5lcbe.eyweo.yxuan.rw6xd.jtoz7")607 assert(connection.session.tackExt.activation_flags == 1) 608 testConnClient(connection) 609 connection.close()610 test_no += 1611 print("Test {0} - good X.509, TACK unrelated to cert chain".\612 format(test_no))613 synchro.recv(1)614 connection = connect()615 try:616 connection.handshakeClientCert(settings=settings)617 assert False618 except TLSLocalAlert as alert:619 if alert.description != AlertDescription.illegal_parameter:620 raise621 connection.close()622 else:623 test_no += 1624 print("Test {0} - good X.509, TACK...skipped (no tackpy)".\625 format(test_no))626 test_no += 1627 print("Test {0} - good X.509, TACK unrelated to cert chain...skipped"628 " (no tackpy)".\629 format(test_no))630 test_no += 1631 print("Test {0} - good PSK".format(test_no))632 synchro.recv(1)633 connection = connect()634 settings = HandshakeSettings()635 settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]636 connection.handshakeClientCert(settings=settings)637 assert connection.session.serverCertChain is None638 assert connection.ecdhCurve is not None639 testConnClient(connection)640 connection.close()641 test_no += 1642 print("Test {0} - good PSK, no DH".format(test_no))643 synchro.recv(1)644 connection = connect()645 settings = HandshakeSettings()646 settings.psk_modes = ["psk_ke"]647 settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]648 connection.handshakeClientCert(settings=settings)649 assert connection.session.serverCertChain is None650 assert connection.ecdhCurve is None651 testConnClient(connection)652 connection.close()653 test_no += 1654 print("Test {0} - good PSK, no DH, no cert".format(test_no))655 synchro.recv(1)656 connection = connect()657 settings = HandshakeSettings()658 settings.psk_modes = ["psk_ke"]659 settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]660 connection.handshakeClientCert(settings=settings)661 assert connection.session.serverCertChain is None662 assert connection.ecdhCurve is None663 testConnClient(connection)664 connection.close()665 test_no += 1666 print("Test {0} - good SRP (db)".format(test_no))667 print("client {0} - waiting for synchro".format(time.time()))668 try:669 synchro.recv(1)670 except Exception:671 print("client {0} - wait abort".format(time.time()))672 raise673 print("client {0} - synchro received".format(time.time()))674 connection = connect()675 settings = HandshakeSettings()676 settings.maxVersion = (3, 3)677 connection.handshakeClientSRP("test", "password", settings=settings)678 testConnClient(connection)679 connection.close()680 test_no += 1681 print("Test {0} - good SRP".format(test_no))682 synchro.recv(1)683 connection = connect()684 settings = HandshakeSettings()685 settings.maxVersion = (3, 3)686 connection.handshakeClientSRP("test", "password", settings=settings)687 testConnClient(connection)688 connection.close()689 test_no += 1690 print("Test {0} - SRP faults".format(test_no))691 for fault in Fault.clientSrpFaults + Fault.genericFaults:692 synchro.recv(1)693 connection = connect()694 connection.fault = fault695 settings = HandshakeSettings()696 settings.maxVersion = (3, 3)697 try:698 connection.handshakeClientSRP("test", "password",699 settings=settings)700 print(" Good Fault %s" % (Fault.faultNames[fault]))701 except TLSFaultError as e:702 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))703 badFault = True704 test_no += 1705 print("Test {0} - good SRP: with X.509 certificate, TLSv1.0".format(test_no))706 settings = HandshakeSettings()707 settings.minVersion = (3,1)708 settings.maxVersion = (3,1) 709 synchro.recv(1)710 connection = connect()711 connection.handshakeClientSRP("test", "password", settings=settings)712 assert(isinstance(connection.session.serverCertChain, X509CertChain))713 testConnClient(connection)714 connection.close()715 test_no += 1716 print("Test {0} - X.509 with SRP faults".format(test_no))717 for fault in Fault.clientSrpFaults + Fault.genericFaults:718 synchro.recv(1)719 connection = connect()720 connection.fault = fault721 settings = HandshakeSettings()722 settings.maxVersion = (3, 3)723 try:724 connection.handshakeClientSRP("test", "password",725 settings=settings)726 print(" Good Fault %s" % (Fault.faultNames[fault]))727 except TLSFaultError as e:728 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))729 badFault = True730 test_no += 1731 print("Test {0} - X.509 faults".format(test_no))732 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:733 synchro.recv(1)734 connection = connect()735 connection.fault = fault736 try:737 connection.handshakeClientCert()738 print(" Good Fault %s" % (Fault.faultNames[fault]))739 except TLSFaultError as e:740 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))741 badFault = True742 test_no += 1743 print("Test {0} - good mutual X.509".format(test_no))744 x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read())745 x509Chain = X509CertChain([x509Cert])746 s = open(os.path.join(dir, "clientX509Key.pem")).read()747 x509Key = parsePEMKey(s, private=True)748 synchro.recv(1)749 connection = connect()750 connection.handshakeClientCert(x509Chain, x509Key)751 testConnClient(connection)752 assert isinstance(connection.session.serverCertChain, X509CertChain)753 connection.close()754 test_no += 1755 print("Test {0} - good mutual ECDSA X.509".format(test_no))756 with open(os.path.join(dir, "clientECCert.pem")) as f:757 x509Cert = X509().parse(f.read())758 x509Chain = X509CertChain([x509Cert])759 with open(os.path.join(dir, "clientECKey.pem")) as f:760 x509Key = parsePEMKey(f.read(), private=True)761 synchro.recv(1)762 connection = connect()763 connection.handshakeClientCert(x509Chain, x509Key)764 testConnClient(connection)765 assert isinstance(connection.session.serverCertChain, X509CertChain)766 assert len(connection.session.serverCertChain.getEndEntityPublicKey()) ==\767 256768 connection.close()769 test_no += 1770 print("Test {0} - good mutual Ed25519 X.509".format(test_no))771 with open(os.path.join(dir, "clientEd25519Cert.pem")) as f:772 x509EdCert = X509().parse(f.read())773 x509EdChain = X509CertChain([x509EdCert])774 with open(os.path.join(dir, "clientEd25519Key.pem")) as f:775 x509EdKey = parsePEMKey(f.read(), private=True)776 synchro.recv(1)777 connection = connect()778 connection.handshakeClientCert(x509EdChain, x509EdKey)779 testConnClient(connection)780 assert isinstance(connection.session.serverCertChain, X509CertChain)781 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\782 == "Ed25519"783 connection.close()784 test_no += 1785 print("Test {0} - good mutual Ed25519 X.509, TLS 1.2".format(test_no))786 with open(os.path.join(dir, "clientEd25519Cert.pem")) as f:787 x509EdCert = X509().parse(f.read())788 x509EdChain = X509CertChain([x509EdCert])789 with open(os.path.join(dir, "clientEd25519Key.pem")) as f:790 x509EdKey = parsePEMKey(f.read(), private=True)791 synchro.recv(1)792 connection = connect()793 settings = HandshakeSettings()794 settings.minVersion = (3, 3)795 settings.maxVersion = (3, 3)796 connection.handshakeClientCert(x509EdChain, x509EdKey, settings=settings)797 testConnClient(connection)798 assert isinstance(connection.session.serverCertChain, X509CertChain)799 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\800 == "Ed25519"801 connection.close()802 test_no += 1803 print("Test {0} - good X.509 DSA, SSLv3".format(test_no))804 synchro.recv(1)805 connection = connect()806 settings = HandshakeSettings()807 settings.minVersion = (3, 0)808 settings.maxVersion = (3, 0)809 connection.handshakeClientCert(settings=settings)810 testConnClient(connection)811 assert connection.session.cipherSuite in\812 constants.CipherSuite.dheDsaSuites813 assert isinstance(connection.session.serverCertChain, X509CertChain)814 connection.close()815 test_no += 1816 print("Test {0} - good X.509 DSA, TLSv1.2".format(test_no))817 synchro.recv(1)818 connection = connect()819 settings = HandshakeSettings()820 settings.minVersion = (3, 3)821 settings.maxVersion = (3, 3)822 connection.handshakeClientCert(settings=settings)823 testConnClient(connection)824 assert connection.session.cipherSuite in\825 constants.CipherSuite.dheDsaSuites826 assert isinstance(connection.session.serverCertChain, X509CertChain)827 connection.close()828 test_no += 1829 print("Test {0} - good X.509 Ed25519, TLSv1.2".format(test_no))830 synchro.recv(1)831 connection = connect()832 settings = HandshakeSettings()833 settings.minVersion = (3, 3)834 settings.maxVersion = (3, 3)835 connection.handshakeClientCert(settings=settings)836 testConnClient(connection)837 assert connection.session.cipherSuite in\838 constants.CipherSuite.ecdheEcdsaSuites839 assert isinstance(connection.session.serverCertChain, X509CertChain)840 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \841 == "Ed25519"842 connection.close()843 test_no += 1844 print("Test {0} - good X.509 Ed448, TLSv1.2".format(test_no))845 synchro.recv(1)846 connection = connect()847 settings = HandshakeSettings()848 settings.minVersion = (3, 3)849 settings.maxVersion = (3, 3)850 connection.handshakeClientCert(settings=settings)851 testConnClient(connection)852 assert connection.session.cipherSuite in\853 constants.CipherSuite.ecdheEcdsaSuites854 assert isinstance(connection.session.serverCertChain, X509CertChain)855 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \856 == "Ed448"857 connection.close()858 test_no += 1859 print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no))860 synchro.recv(1)861 connection = connect()862 settings = HandshakeSettings()863 settings.minVersion = (3,4)864 settings.maxVersion = (3,4)865 connection.handshakeClientCert(settings=settings)866 testConnClient(connection)867 assert isinstance(connection.session.serverCertChain, X509CertChain)868 connection.close()869 test_no += 1870 print("Test {0} - good mutual X.509, TLSv1.3".format(test_no))871 synchro.recv(1)872 connection = connect()873 settings = HandshakeSettings()874 settings.minVersion = (3,4)875 settings.maxVersion = (3,4)876 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)877 testConnClient(connection)878 assert(isinstance(connection.session.serverCertChain, X509CertChain))879 connection.close()880 test_no += 1881 print("Test {0} - good mutual X.509, PHA, TLSv1.3".format(test_no))882 synchro.recv(1)883 connection = connect()884 settings = HandshakeSettings()885 settings.minVersion = (3, 4)886 settings.maxVersion = (3, 4)887 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)888 synchro.recv(1)889 b = connection.read(0, 0)890 assert b == b''891 testConnClient(connection)892 assert(isinstance(connection.session.serverCertChain, X509CertChain))893 connection.close()894 test_no += 1895 print("Test {0} - good mutual X.509 Ed25519, PHA, TLSv1.3".format(test_no))896 synchro.recv(1)897 connection = connect()898 settings = HandshakeSettings()899 settings.minVersion = (3, 4)900 settings.maxVersion = (3, 4)901 connection.handshakeClientCert(x509EdChain, x509EdKey, settings=settings)902 synchro.recv(1)903 b = connection.read(0, 0)904 assert b == b''905 testConnClient(connection)906 assert isinstance(connection.session.serverCertChain, X509CertChain)907 assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\908 == "Ed25519"909 connection.close()910 test_no += 1911 print("Test {0} - good mutual X.509, PHA and KeyUpdate, TLSv1.3".format(test_no))912 synchro.recv(1)913 connection = connect()914 settings = HandshakeSettings()915 settings.minVersion = (3, 4)916 settings.maxVersion = (3, 4)917 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)918 for result in connection.send_keyupdate_request(919 KeyUpdateMessageType.update_requested):920 assert result in (0, 1)921 synchro.recv(1)922 b = connection.read(0, 0)923 assert b == b''924 testConnClient(connection)925 assert(isinstance(connection.session.serverCertChain, X509CertChain))926 connection.close()927 test_no += 1928 print("Test {0} - mutual X.509, PHA, no client cert, TLSv1.3".format(test_no))929 synchro.recv(1)930 connection = connect()931 settings = HandshakeSettings()932 settings.minVersion = (3, 4)933 settings.maxVersion = (3, 4)934 connection.handshakeClientCert(X509CertChain(), x509Key, settings=settings)935 synchro.recv(1)936 b = connection.read(0, 0)937 assert b == b''938 try:939 connection.read(0, 0)940 assert False941 except TLSRemoteAlert as e:942 assert e.description == AlertDescription.certificate_required943 assert "certificate_required" in str(e), str(e)944 connection.close()945 test_no += 1946 print("Test {0} - good mutual X.509, TLSv1.1".format(test_no))947 synchro.recv(1)948 connection = connect()949 settings = HandshakeSettings()950 settings.minVersion = (3,2)951 settings.maxVersion = (3,2)952 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)953 testConnClient(connection)954 assert(isinstance(connection.session.serverCertChain, X509CertChain))955 connection.close()956 test_no += 1957 print("Test {0} - good mutual X.509, SSLv3".format(test_no))958 synchro.recv(1)959 connection = connect()960 settings = HandshakeSettings()961 settings.minVersion = (3,0)962 settings.maxVersion = (3,0)963 connection.handshakeClientCert(x509Chain, x509Key, settings=settings)964 testConnClient(connection)965 assert(isinstance(connection.session.serverCertChain, X509CertChain))966 connection.close()967 test_no += 1968 print("Test {0} - mutual X.509 faults".format(test_no))969 for fault in Fault.clientCertFaults + Fault.genericFaults:970 synchro.recv(1)971 connection = connect()972 connection.fault = fault973 try:974 connection.handshakeClientCert(x509Chain, x509Key)975 print(" Good Fault %s" % (Fault.faultNames[fault]))976 except TLSFaultError as e:977 print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e)))978 badFault = True979 test_no += 1980 print("Test {0} - good SRP, prepare to resume... (plus SNI)".\981 format(test_no))982 synchro.recv(1)983 connection = connect()984 settings = HandshakeSettings()985 settings.maxVersion = (3, 3)986 connection.handshakeClientSRP("test", "password", serverName=address[0],987 settings=settings)988 testConnClient(connection)989 connection.close()990 session = connection.session991 test_no += 1992 print("Test {0} - resumption (plus SNI)".format(test_no))993 synchro.recv(1)994 connection = connect()995 settings = HandshakeSettings()996 settings.maxVersion = (3, 3)997 connection.handshakeClientSRP("test", "garbage", serverName=address[0], 998 session=session, settings=settings)999 testConnClient(connection)1000 #Don't close! -- see below1001 test_no += 11002 print("Test {0} - invalidated resumption (plus SNI)".format(test_no))1003 synchro.recv(1)1004 connection.sock.close() #Close the socket without a close_notify!1005 synchro.recv(1)1006 connection = connect()1007 settings = HandshakeSettings()1008 settings.maxVersion = (3, 3)1009 try:1010 connection.handshakeClientSRP("test", "garbage",1011 serverName=address[0],1012 session=session, settings=settings)1013 assert False1014 except TLSRemoteAlert as alert:1015 if alert.description != AlertDescription.bad_record_mac:1016 raise1017 connection.close()1018 test_no += 11019 print("Test {0} - HTTPS test X.509".format(test_no))1020 address = address[0], address[1]+11021 if hasattr(socket, "timeout"):1022 timeoutEx = socket.timeout1023 else:1024 timeoutEx = socket.error1025 while 1:1026 try:1027 htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8")1028 fingerprint = None1029 for y in range(2):1030 checker =Checker(x509Fingerprint=fingerprint)1031 h = HTTPTLSConnection(\1032 address[0], address[1], checker=checker)1033 for x in range(3):1034 synchro.recv(1)1035 h.request("GET", "/index.html")1036 r = h.getresponse()1037 assert(r.status == 200)1038 b = bytearray(r.read())1039 assert(b == htmlBody)1040 fingerprint = h.tlsSession.serverCertChain.getFingerprint()1041 assert(fingerprint)1042 break1043 except timeoutEx:1044 print("timeout, retrying...")1045 pass1046 address = address[0], address[1]+11047 implementations = []1048 if m2cryptoLoaded:1049 implementations.append("openssl")1050 if pycryptoLoaded:1051 implementations.append("pycrypto")1052 implementations.append("python")1053 test_no += 11054 print("Test {0} - different ciphers, TLSv1.0".format(test_no))1055 for implementation in implementations:1056 for cipher in ["aes128", "aes256", "rc4"]:1057 test_no += 11058 print("Test {0}:".format(test_no), end=' ')1059 synchro.recv(1)1060 connection = connect()1061 settings = HandshakeSettings()1062 settings.cipherNames = [cipher]1063 settings.cipherImplementations = [implementation, "python"]1064 settings.minVersion = (3,1)1065 settings.maxVersion = (3,1) 1066 connection.handshakeClientCert(settings=settings)1067 testConnClient(connection)1068 print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation()))1069 connection.close()1070 test_no += 11071 print("Test {0} - throughput test".format(test_no))1072 for implementation in implementations:1073 for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8",1074 "aes128gcm", "aes256gcm", "aes128", "aes256", "3des",1075 "rc4", "chacha20-poly1305_draft00",1076 "chacha20-poly1305"]:1077 # skip tests with implementations that don't support them1078 if cipher == "3des" and implementation not in ("openssl",1079 "pycrypto"):1080 continue1081 if cipher in ("aes128gcm", "aes256gcm") and \1082 implementation not in ("pycrypto",1083 "python", "openssl"):1084 continue1085 if cipher in ("aes128ccm", "aes128ccm_8",1086 "aes256ccm", "aes256ccm_8") and \1087 implementation not in ("python", "openssl"):1088 continue1089 if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \1090 and implementation not in ("python", ):1091 continue1092 test_no += 11093 print("Test {0}:".format(test_no), end=' ')1094 synchro.recv(1)1095 connection = connect()1096 settings = HandshakeSettings()1097 settings.cipherNames = [cipher]1098 settings.cipherImplementations = [implementation, "python"]1099 if cipher not in ("aes128ccm", "aes128ccm_8", "aes128gcm",1100 "aes256gcm", "chacha20-poly1305"):1101 settings.maxVersion = (3, 3)1102 connection.handshakeClientCert(settings=settings)1103 print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ')1104 startTime = timeit.default_timer()1105 connection.write(b"hello"*10000)1106 h = connection.read(min=50000, max=50000)1107 stopTime = timeit.default_timer()1108 sizeofdata = len(h)*21109 if stopTime-startTime:1110 print("100K exchanged at rate of %d bytes/sec" % int(sizeofdata/(stopTime-startTime)))1111 else:1112 print("100K exchanged very fast")1113 assert(h == b"hello"*10000)1114 connection.close()1115 test_no += 11116 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1117 synchro.recv(1)1118 connection = connect()1119 settings = HandshakeSettings()1120 settings.maxVersion = (3, 3)1121 connection.handshakeClientCert(nextProtos=[b"http/1.1"], settings=settings)1122 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1123 assert(connection.next_proto == b'http/1.1')1124 connection.close()1125 test_no += 11126 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1127 synchro.recv(1)1128 connection = connect()1129 settings = HandshakeSettings()1130 settings.maxVersion = (3, 3)1131 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"],1132 settings=settings)1133 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1134 assert(connection.next_proto == b'spdy/2')1135 connection.close()1136 test_no += 11137 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1138 synchro.recv(1)1139 connection = connect()1140 settings = HandshakeSettings()1141 settings.maxVersion = (3, 3)1142 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"],1143 settings=settings)1144 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1145 assert(connection.next_proto == b'spdy/2')1146 connection.close()1147 test_no += 11148 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1149 synchro.recv(1)1150 connection = connect()1151 settings = HandshakeSettings()1152 settings.maxVersion = (3, 3)1153 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2",1154 b"http/1.1"],1155 settings=settings)1156 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1157 assert(connection.next_proto == b'spdy/2')1158 connection.close()1159 test_no += 11160 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1161 synchro.recv(1)1162 connection = connect()1163 settings = HandshakeSettings()1164 settings.maxVersion = (3, 3)1165 connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2",1166 b"http/1.1"],1167 settings=settings)1168 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1169 assert(connection.next_proto == b'spdy/3')1170 connection.close()1171 test_no += 11172 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1173 synchro.recv(1)1174 connection = connect()1175 settings = HandshakeSettings()1176 settings.maxVersion = (3, 3)1177 connection.handshakeClientCert(nextProtos=[b"http/1.1"], settings=settings)1178 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1179 assert(connection.next_proto == b'http/1.1')1180 connection.close()1181 test_no += 11182 print("Test {0} - Next-Protocol Client Negotiation".format(test_no))1183 synchro.recv(1)1184 connection = connect()1185 settings = HandshakeSettings()1186 settings.maxVersion = (3, 3)1187 connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"],1188 settings=settings)1189 #print(" Next-Protocol Negotiated: %s" % connection.next_proto)1190 assert(connection.next_proto == b'spdy/2')1191 connection.close()1192 test_no += 11193 print("Test {0} - FALLBACK_SCSV".format(test_no))1194 synchro.recv(1)1195 connection = connect()1196 settings = HandshakeSettings()1197 settings.sendFallbackSCSV = True1198 settings.maxVersion = (3, 3)1199 # TODO fix FALLBACK_SCSV with TLS 1.31200 connection.handshakeClientCert(settings=settings)1201 testConnClient(connection)1202 connection.close()1203 test_no += 11204 print("Test {0} - FALLBACK_SCSV".format(test_no))1205 synchro.recv(1)1206 connection = connect()1207 settings = HandshakeSettings()1208 settings.sendFallbackSCSV = True1209 settings.maxVersion = (3, 2)1210 try:1211 connection.handshakeClientCert(settings=settings)1212 assert False1213 except TLSRemoteAlert as alert:1214 if alert.description != AlertDescription.inappropriate_fallback:1215 raise1216 connection.close()1217 test_no += 11218 print("Test {0} - no EtM server side".format(test_no))1219 synchro.recv(1)1220 connection = connect()1221 settings = HandshakeSettings()1222 settings.macNames.remove("aead")1223 settings.maxVersion = (3, 3)1224 assert(settings.useEncryptThenMAC)1225 connection.handshakeClientCert(serverName=address[0], settings=settings)1226 testConnClient(connection)1227 assert(isinstance(connection.session.serverCertChain, X509CertChain))1228 assert(connection.session.serverName == address[0])1229 assert(not connection.encryptThenMAC)1230 connection.close()1231 test_no += 11232 print("Test {0} - no EtM client side".format(test_no))1233 synchro.recv(1)1234 connection = connect()1235 settings = HandshakeSettings()1236 settings.macNames.remove("aead")1237 settings.useEncryptThenMAC = False1238 settings.maxVersion = (3, 3)1239 connection.handshakeClientCert(serverName=address[0], settings=settings)1240 testConnClient(connection)1241 assert(isinstance(connection.session.serverCertChain, X509CertChain))1242 assert(connection.session.serverName == address[0])1243 assert(not connection.encryptThenMAC)1244 connection.close()1245 test_no += 11246 print("Test {0} - resumption with EtM".format(test_no))1247 synchro.recv(1)1248 connection = connect()1249 settings = HandshakeSettings()1250 settings.macNames.remove("aead")1251 settings.maxVersion = (3, 3)1252 connection.handshakeClientCert(serverName=address[0], settings=settings)1253 testConnClient(connection)1254 assert(isinstance(connection.session.serverCertChain, X509CertChain))1255 assert(connection.session.serverName == address[0])1256 assert(not connection.resumed)1257 assert(connection.encryptThenMAC)1258 connection.close()1259 session = connection.session1260 # resume1261 synchro.recv(1)1262 connection = connect()1263 settings = HandshakeSettings()1264 settings.maxVersion = (3, 3)1265 connection.handshakeClientCert(serverName=address[0], session=session,1266 settings=settings)1267 testConnClient(connection)1268 assert(isinstance(connection.session.serverCertChain, X509CertChain))1269 assert(connection.session.serverName == address[0])1270 assert(connection.resumed)1271 assert(connection.encryptThenMAC)1272 connection.close()1273 test_no += 11274 print("Test {0} - resumption with no EtM in 2nd handshake".format(test_no))1275 synchro.recv(1)1276 connection = connect()1277 settings = HandshakeSettings()1278 settings.macNames.remove("aead")1279 settings.maxVersion = (3, 3)1280 connection.handshakeClientCert(serverName=address[0], settings=settings)1281 testConnClient(connection)1282 assert(isinstance(connection.session.serverCertChain, X509CertChain))1283 assert(connection.session.serverName == address[0])1284 assert(not connection.resumed)1285 assert(connection.encryptThenMAC)1286 connection.close()1287 session = connection.session1288 # resume1289 synchro.recv(1)1290 settings = HandshakeSettings()1291 settings.useEncryptThenMAC = False1292 settings.macNames.remove("aead")1293 settings.maxVersion = (3, 3)1294 connection = connect()1295 try:1296 connection.handshakeClientCert(serverName=address[0], session=session,1297 settings=settings)1298 assert False1299 except TLSRemoteAlert as e:1300 assert(str(e) == "illegal_parameter")1301 else:1302 raise AssertionError("No exception raised")1303 connection.close()1304 test_no += 11305 print("Test {0} - resumption in TLSv1.3".format(test_no))1306 synchro.recv(1)1307 connection = connect()1308 settings = HandshakeSettings()1309 # force HRR1310 settings.keyShares = []1311 connection.handshakeClientCert(serverName=address[0], settings=settings)1312 testConnClient(connection)1313 assert isinstance(connection.session.serverCertChain, X509CertChain)1314 assert connection.session.serverName == address[0]1315 assert not connection.resumed1316 assert connection.session.tickets1317 connection.close()1318 session = connection.session1319 # resume1320 synchro.recv(1)1321 settings = HandshakeSettings()1322 settings.keyShares = []1323 connection = connect()1324 connection.handshakeClientCert(serverName=address[0], session=session,1325 settings=settings)1326 testConnClient(connection)1327 assert connection.resumed1328 connection.close()1329 test_no += 11330 print("Test {0} - resumption in TLSv1.3 with mutual X.509".format(test_no))1331 synchro.recv(1)1332 connection = connect()1333 settings = HandshakeSettings()1334 settings.minVersion = (3,4)1335 # force HRR1336 settings.keyShares = []1337 connection.handshakeClientCert(x509Chain, x509Key, serverName=address[0],1338 settings=settings)1339 testConnClient(connection)1340 assert isinstance(connection.session.serverCertChain, X509CertChain)1341 assert connection.session.serverName == address[0]1342 assert not connection.resumed1343 assert connection.session.tickets1344 connection.close()1345 session = connection.session1346 # resume1347 synchro.recv(1)1348 settings = HandshakeSettings()1349 settings.minVersion = (3,4)1350 settings.keyShares = []1351 connection = connect()1352 connection.handshakeClientCert(x509Chain, x509Key, serverName=address[0], session=session,1353 settings=settings)1354 testConnClient(connection)1355 assert connection.resumed1356 connection.close()1357 test_no += 11358 print("Test {0} - resumption in TLSv1.3 with AES-CCM tickets".format(test_no))1359 synchro.recv(1)1360 connection = connect()1361 settings = HandshakeSettings()1362 settings.minVersion = (3, 4)1363 # force HRR1364 settings.keyShares = []1365 connection.handshakeClientCert(serverName=address[0], settings=settings)1366 testConnClient(connection)1367 assert isinstance(connection.session.serverCertChain, X509CertChain)1368 assert connection.session.serverName == address[0]1369 assert not connection.resumed1370 assert connection.session.tickets1371 connection.close()1372 session = connection.session1373 # resume1374 synchro.recv(1)1375 settings = HandshakeSettings()1376 settings.minVersion = (3, 4)1377 settings.keyShares = []1378 connection = connect()1379 connection.handshakeClientCert(serverName=address[0], session=session,1380 settings=settings)1381 testConnClient(connection)1382 assert connection.resumed1383 connection.close()1384 test_no += 11385 print("Test {0} - Heartbeat extension response callback in TLSv1.2".format(test_no))1386 heartbeat_payload = os.urandom(50)1387 def heartbeat_response_check(message):1388 global received_payload1389 received_payload = message.payload1390 synchro.recv(1)1391 connection = connect()1392 settings = HandshakeSettings()1393 settings.maxVersion = (3, 3)1394 settings.heartbeat_response_callback = heartbeat_response_check1395 connection.handshakeClientCert(serverName=address[0], settings=settings)1396 connection.send_heartbeat_request(heartbeat_payload, 16)1397 testConnClient(connection)1398 testConnClient(connection)1399 connection.close()1400 assert heartbeat_payload == received_payload1401 test_no += 11402 print("Test {0} - Heartbeat extension in TLSv1.3".format(test_no))1403 heartbeat_payload = os.urandom(50)1404 def heartbeat_response_check(message):1405 global received_payload1406 received_payload = message.payload1407 synchro.recv(1)1408 connection = connect()1409 settings = HandshakeSettings()1410 settings.maxVersion = (3, 4)1411 settings.heartbeat_response_callback = heartbeat_response_check1412 connection.handshakeClientCert(serverName=address[0], settings=settings)1413 connection.send_heartbeat_request(heartbeat_payload, 16)1414 testConnClient(connection)1415 testConnClient(connection)1416 connection.close()1417 assert heartbeat_payload == received_payload1418 test_no += 11419 print("Test {0} - KeyUpdate from client in TLSv1.3".format(test_no))1420 assert synchro.recv(1) == b'R'1421 connection = connect()1422 settings = HandshakeSettings()1423 settings.maxVersion = (3, 4)1424 connection.handshakeClientCert(serverName=address[0], settings=settings)1425 assert synchro.recv(1) == b'K'1426 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):1427 assert i in (0, 1)1428 assert synchro.recv(1) == b'K'1429 testConnClient(connection)1430 connection.close()1431 test_no += 11432 print("Test {0} - mutual KeyUpdates in TLSv1.3".format(test_no))1433 assert synchro.recv(1) == b'R'1434 connection = connect()1435 settings = HandshakeSettings()1436 settings.maxVersion = (3, 4)1437 connection.handshakeClientCert(serverName=address[0], settings=settings)1438 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):1439 assert i in (0, 1)1440 testConnClient(connection)1441 synchro.send(b'R')1442 connection.close()1443 test_no += 11444 print("Test {0} - multiple mutual KeyUpdates in TLSv1.3".format(test_no))1445 assert synchro.recv(1) == b'R'1446 connection = connect()1447 settings = HandshakeSettings()1448 settings.maxVersion = (3, 4)1449 connection.handshakeClientCert(serverName=address[0], settings=settings)1450 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):1451 assert i in (0, 1)1452 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):1453 assert i in (0, 1)1454 testConnClient(connection)1455 synchro.send(b'R')1456 connection.close()1457 test_no += 11458 print('Test {0} - good standard XMLRPC https client'.format(test_no))1459 address = address[0], address[1]+11460 synchro.recv(1)1461 try:1462 # python 2.7.9 introduced certificate verification (context option)1463 # python 3.4.2 doesn't have it though1464 context = ssl.create_default_context(\1465 cafile=os.path.join(dir, "serverX509Cert.pem"))1466 server = xmlrpclib.Server('https://%s:%s' % address, context=context)1467 except (TypeError, AttributeError):1468 server = xmlrpclib.Server('https://%s:%s' % address)1469 synchro.recv(1)1470 assert server.add(1,2) == 31471 synchro.recv(1)1472 assert server.pow(2,4) == 161473 test_no += 11474 print('Test {0} - good tlslite XMLRPC client'.format(test_no))1475 transport = XMLRPCTransport(ignoreAbruptClose=True)1476 server = xmlrpclib.Server('https://%s:%s' % address, transport)1477 synchro.recv(1)1478 assert server.add(1,2) == 31479 synchro.recv(1)1480 assert server.pow(2,4) == 161481 test_no += 11482 print('Test {0} - good XMLRPC ignored protocol'.format(test_no))1483 server = xmlrpclib.Server('http://%s:%s' % address, transport)1484 synchro.recv(1)1485 assert server.add(1,2) == 31486 synchro.recv(1)1487 assert server.pow(2,4) == 161488 test_no += 11489 print("Test {0} - Internet servers test".format(test_no))1490 try:1491 i = IMAP4_TLS("cyrus.andrew.cmu.edu")1492 i.login("anonymous", "anonymous@anonymous.net")1493 i.logout()1494 test_no += 11495 print("Test {0}: IMAP4 good".format(test_no))1496 p = POP3_TLS("pop.gmail.com")1497 p.quit()1498 test_no += 11499 print("Test {0}: POP3 good".format(test_no))1500 except (socket.error, socket.timeout) as e:1501 print("Non-critical error: socket error trying to reach internet "1502 "server: ", e)1503 synchro.close()1504 if not badFault:1505 print("Test succeeded, {0} good".format(test_no))1506 else:1507 print("Test failed")1508def testConnServer(connection):1509 count = 01510 while 1:1511 s = connection.read()1512 count += len(s)1513 if len(s) == 0:1514 break1515 connection.write(s)1516 if count == 1111:1517 break1518def serverTestCmd(argv):1519 address = argv[0]1520 dir = argv[1]1521 1522 #Split address into hostname/port tuple1523 address = address.split(":")1524 address = ( address[0], int(address[1]) )1525 #Create synchronisation FIFO1526 synchroSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)1527 synchroSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)1528 synchroSocket.bind((address[0], address[1]-1))1529 synchroSocket.listen(2)1530 #Connect to server1531 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)1532 lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)1533 lsock.bind(address)1534 lsock.listen(5)1535 # following is blocking until the other side doesn't open1536 synchro = synchroSocket.accept()[0]1537 def connect():1538 s = lsock.accept()[0]1539 s.settimeout(15)1540 return TLSConnection(s)1541 with open(os.path.join(dir, "serverX509Cert.pem")) as f:1542 x509Cert = X509().parse(f.read())1543 x509Chain = X509CertChain([x509Cert])1544 with open(os.path.join(dir, "serverX509Key.pem")) as f:1545 x509Key = parsePEMKey(f.read(), private=True)1546 with open(os.path.join(dir, "serverRSAPSSSigCert.pem")) as f:1547 x509CertRSAPSSSig = X509().parse(f.read())1548 x509ChainRSAPSSSig = X509CertChain([x509CertRSAPSSSig])1549 with open(os.path.join(dir, "serverRSAPSSSigKey.pem")) as f:1550 x509KeyRSAPSSSig = parsePEMKey(f.read(), private=True)1551 with open(os.path.join(dir, "serverRSAPSSCert.pem")) as f:1552 x509CertRSAPSS = X509().parse(f.read())1553 x509ChainRSAPSS = X509CertChain([x509CertRSAPSS])1554 assert x509CertRSAPSS.certAlg == "rsa-pss"1555 with open(os.path.join(dir, "serverRSAPSSKey.pem")) as f:1556 x509KeyRSAPSS = parsePEMKey(f.read(), private=True,1557 implementations=["python"])1558 with open(os.path.join(dir, "serverECCert.pem")) as f:1559 x509CertECDSA = X509().parse(f.read())1560 x509ecdsaChain = X509CertChain([x509CertECDSA])1561 assert x509CertECDSA.certAlg == "ecdsa"1562 with open(os.path.join(dir, "serverECKey.pem")) as f:1563 x509ecdsaKey = parsePEMKey(f.read(), private=True,1564 implementations=["python"])1565 with open(os.path.join(dir, "serverP384ECCert.pem")) as f:1566 x509CertP384ECDSA = X509().parse(f.read())1567 x509ecdsaP384Chain = X509CertChain([x509CertP384ECDSA])1568 assert x509CertP384ECDSA.certAlg == "ecdsa"1569 with open(os.path.join(dir, "serverP384ECKey.pem")) as f:1570 x509ecdsaP384Key = parsePEMKey(f.read(), private=True,1571 implementations=["python"])1572 with open(os.path.join(dir, "serverP521ECCert.pem")) as f:1573 x509CertP521ECDSA = X509().parse(f.read())1574 x509ecdsaP521Chain = X509CertChain([x509CertP521ECDSA])1575 assert x509CertP521ECDSA.certAlg == "ecdsa"1576 with open(os.path.join(dir, "serverP521ECKey.pem")) as f:1577 x509ecdsaP521Key = parsePEMKey(f.read(), private=True,1578 implementations=["python"])1579 with open(os.path.join(dir, "serverBrainpoolP256r1ECCert.pem")) as f:1580 x509CertBrainpoolP256r1ECDSA = X509().parse(f.read())1581 x509ecdsaBrainpoolP256r1Chain = X509CertChain([x509CertBrainpoolP256r1ECDSA])1582 assert x509CertBrainpoolP256r1ECDSA.certAlg == "ecdsa"1583 with open(os.path.join(dir, "serverBrainpoolP256r1ECKey.pem")) as f:1584 x509ecdsaBrainpoolP256r1Key = parsePEMKey(f.read(), private=True,1585 implementations=["python"])1586 with open(os.path.join(dir, "serverBrainpoolP384r1ECCert.pem")) as f:1587 x509CertBrainpoolP384r1ECDSA = X509().parse(f.read())1588 x509ecdsaBrainpoolP384r1Chain = X509CertChain([x509CertBrainpoolP384r1ECDSA])1589 assert x509CertBrainpoolP384r1ECDSA.certAlg == "ecdsa"1590 with open(os.path.join(dir, "serverBrainpoolP384r1ECKey.pem")) as f:1591 x509ecdsaBrainpoolP384r1Key = parsePEMKey(f.read(), private=True,1592 implementations=["python"])1593 with open(os.path.join(dir, "serverBrainpoolP512r1ECCert.pem")) as f:1594 x509CertBrainpoolP512r1ECDSA = X509().parse(f.read())1595 x509ecdsaBrainpoolP512r1Chain = X509CertChain([x509CertBrainpoolP512r1ECDSA])1596 assert x509CertBrainpoolP512r1ECDSA.certAlg == "ecdsa"1597 with open(os.path.join(dir, "serverBrainpoolP512r1ECKey.pem")) as f:1598 x509ecdsaBrainpoolP512r1Key = parsePEMKey(f.read(), private=True,1599 implementations=["python"])1600 with open(os.path.join(dir, "serverRSANonCACert.pem")) as f:1601 x509CertRSANonCA = X509().parse(f.read())1602 x509ChainRSANonCA = X509CertChain([x509CertRSANonCA])1603 assert x509CertRSANonCA.certAlg == "rsa"1604 with open(os.path.join(dir, "serverRSANonCAKey.pem")) as f:1605 x509KeyRSANonCA = parsePEMKey(f.read(), private=True,1606 implementations=["python"])1607 with open(os.path.join(dir, "serverECDSANonCACert.pem")) as f:1608 x509CertECDSANonCA = X509().parse(f.read())1609 x509ChainECDSANonCA = X509CertChain([x509CertECDSANonCA])1610 assert x509CertECDSANonCA.certAlg == "ecdsa"1611 with open(os.path.join(dir, "serverECDSANonCAKey.pem")) as f:1612 x509KeyECDSANonCA = parsePEMKey(f.read(), private=True,1613 implementations=["python"])1614 with open(os.path.join(dir, "serverDSACert.pem")) as f:1615 x509CertDSA = X509().parse(f.read())1616 x509ChainDSA = X509CertChain([x509CertDSA])1617 assert x509CertDSA.certAlg == "dsa"1618 with open(os.path.join(dir, "serverDSAKey.pem")) as f:1619 x509KeyDSA = parsePEMKey(f.read(), private=True,1620 implementations=["python"])1621 with open(os.path.join(dir, "serverEd25519Cert.pem")) as f:1622 x509CertEd25519 = X509().parse(f.read())1623 x509Ed25519Chain = X509CertChain([x509CertEd25519])1624 assert x509CertEd25519.certAlg == "Ed25519"1625 with open(os.path.join(dir, "serverEd25519Key.pem")) as f:1626 x509Ed25519Key = parsePEMKey(f.read(), private=True,1627 implementations=["python"])1628 with open(os.path.join(dir, "serverEd448Cert.pem")) as f:1629 x509CertEd448 = X509().parse(f.read())1630 x509Ed448Chain = X509CertChain([x509CertEd448])1631 assert x509CertEd448.certAlg == "Ed448"1632 with open(os.path.join(dir, "serverEd448Key.pem")) as f:1633 x509Ed448Key = parsePEMKey(f.read(), private=True,1634 implementations=["python"])1635 test_no = 01636 print("Test {0} - Anonymous server handshake".format(test_no))1637 synchro.send(b'R')1638 connection = connect()1639 connection.handshakeServer(anon=True)1640 testConnServer(connection) 1641 connection.close()1642 test_no += 11643 print("Test {0} - good X.509 (plus SNI)".format(test_no))1644 synchro.send(b'R')1645 connection = connect()1646 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)1647 assert connection.session.serverName == address[0]1648 assert connection.extendedMasterSecret1649 assert connection.session.appProto is None1650 testConnServer(connection)1651 connection.close()1652 test_no += 11653 print("Test {0} - good X.509 TLSv1.2 (plus ALPN)".format(test_no))1654 synchro.send(b'R')1655 settings = HandshakeSettings()1656 settings.maxVersion = (3, 3)1657 connection = connect()1658 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,1659 alpn=[b'http/1.1', b'http/1.0'],1660 settings=settings)1661 assert connection.session.serverName == address[0]1662 assert connection.extendedMasterSecret1663 assert connection.session.appProto == b'http/1.1'1664 testConnServer(connection)1665 connection.close()1666 test_no += 11667 print("Test {0} - good X.509 TLSv1.3 (plus ALPN)".format(test_no))1668 synchro.send(b'R')1669 connection = connect()1670 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,1671 alpn=[b'http/1.1', b'http/1.0'])1672 assert connection.session.serverName == address[0]1673 assert connection.extendedMasterSecret1674 assert connection.session.appProto == b'http/1.1'1675 testConnServer(connection)1676 connection.close()1677 test_no += 11678 print("Test {0} - good X.509/w RSA-PSS sig".format(test_no))1679 synchro.send(b'R')1680 connection = connect()1681 connection.handshakeServer(certChain=x509ChainRSAPSSSig,1682 privateKey=x509KeyRSAPSSSig)1683 assert(connection.extendedMasterSecret)1684 testConnServer(connection)1685 connection.close()1686 test_no += 11687 print("Test {0} - good X.509/w RSA-PSS cert".format(test_no))1688 synchro.send(b'R')1689 connection = connect()1690 connection.handshakeServer(certChain=x509ChainRSAPSS,1691 privateKey=x509KeyRSAPSS)1692 assert(connection.session.serverName == address[0])1693 assert(connection.extendedMasterSecret)1694 testConnServer(connection)1695 connection.close()1696 test_no += 11697 print("Test {0} - good X.509/w RSA-PSS cert".format(test_no))1698 synchro.send(b'R')1699 connection = connect()1700 settings = HandshakeSettings()1701 settings.minVersion = (3, 3)1702 settings.maxVersion = (3, 3)1703 connection.handshakeServer(certChain=x509ChainRSAPSS,1704 privateKey=x509KeyRSAPSS,1705 settings=settings)1706 assert(connection.session.serverName == address[0])1707 assert(connection.extendedMasterSecret)1708 testConnServer(connection)1709 connection.close()1710 test_no += 11711 print("Test {0} - good X.509, small record_size_limit".format(test_no))1712 synchro.send(b'R')1713 connection = connect()1714 settings = HandshakeSettings()1715 settings.record_size_limit = 641716 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)1717 testConnServer(connection)1718 connection.close()1719 test_no += 11720 print("Test {0} - good X.509, SSLv3".format(test_no))1721 synchro.send(b'R')1722 connection = connect()1723 settings = HandshakeSettings()1724 settings.minVersion = (3,0)1725 settings.maxVersion = (3,0)1726 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)1727 assert(not connection.extendedMasterSecret)1728 testConnServer(connection)1729 connection.close()1730 test_no += 11731 print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no))1732 synchro.send(b'R')1733 connection = connect()1734 settings = HandshakeSettings()1735 settings.minVersion = (3, 0)1736 settings.maxVersion = (3, 0)1737 connection.handshakeServer(certChain=x509ecdsaChain,1738 privateKey=x509ecdsaKey, settings=settings)1739 assert not connection.extendedMasterSecret1740 testConnServer(connection)1741 connection.close()1742 test_no += 11743 print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no))1744 synchro.send(b'R')1745 connection = connect()1746 settings = HandshakeSettings()1747 settings.minVersion = (3, 1)1748 settings.maxVersion = (3, 1)1749 connection.handshakeServer(certChain=x509ecdsaChain,1750 privateKey=x509ecdsaKey, settings=settings)1751 assert connection.extendedMasterSecret1752 testConnServer(connection)1753 connection.close()1754 test_no += 11755 print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no))1756 synchro.send(b'R')1757 connection = connect()1758 settings = HandshakeSettings()1759 settings.minVersion = (3, 3)1760 settings.maxVersion = (3, 3)1761 connection.handshakeServer(certChain=x509ecdsaChain,1762 privateKey=x509ecdsaKey, settings=settings)1763 assert connection.extendedMasterSecret1764 testConnServer(connection)1765 connection.close()1766 test_no += 11767 print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no))1768 synchro.send(b'R')1769 connection = connect()1770 settings = HandshakeSettings()1771 settings.minVersion = (3, 3)1772 settings.maxVersion = (3, 3)1773 try:1774 connection.handshakeServer(certChain=x509ecdsaChain,1775 privateKey=x509ecdsaKey, settings=settings)1776 assert False1777 except TLSLocalAlert as e:1778 assert "curve in the public key is not supported by the client" in str(e)1779 connection.close()1780 test_no += 11781 for curve, certChain, key in (("brainpoolP256r1", x509ecdsaBrainpoolP256r1Chain, x509ecdsaBrainpoolP256r1Key),1782 ("brainpoolP384r1", x509ecdsaBrainpoolP384r1Chain, x509ecdsaBrainpoolP384r1Key),1783 ("brainpoolP512r1", x509ecdsaBrainpoolP512r1Chain, x509ecdsaBrainpoolP512r1Key)):1784 print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2".format(test_no, curve))1785 synchro.send(b'R')1786 connection = connect()1787 settings = HandshakeSettings()1788 settings.minVersion = (3, 3)1789 settings.maxVersion = (3, 3)1790 settings.eccCurves = [curve, "secp256r1"]1791 settings.keyShares = []1792 v_host = VirtualHost()1793 v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)]1794 settings.virtual_hosts = [v_host]1795 connection.handshakeServer(certChain=certChain,1796 privateKey=key, settings=settings)1797 assert connection.extendedMasterSecret1798 assert connection.session.serverCertChain == certChain1799 testConnServer(connection)1800 connection.close()1801 test_no += 11802 for curve, exp_chain in (("secp256r1", x509ecdsaChain),1803 ("secp384r1", x509ecdsaP384Chain)):1804 print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2"1805 .format(test_no, curve))1806 synchro.send(b'R')1807 connection = connect()1808 settings = HandshakeSettings()1809 settings.minVersion = (3, 3)1810 settings.maxVersion = (3, 3)1811 v_host = VirtualHost()1812 v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)]1813 settings.virtual_hosts = [v_host]1814 connection.handshakeServer(certChain=x509ecdsaP384Chain,1815 privateKey=x509ecdsaP384Key, settings=settings)1816 assert connection.extendedMasterSecret1817 assert connection.session.serverCertChain == exp_chain1818 testConnServer(connection)1819 connection.close()1820 test_no += 11821 for tls_ver in ("TLSv1.2", "TLSv1,3"):1822 print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, {1}"1823 .format(test_no, tls_ver))1824 synchro.send(b'R')1825 connection = connect()1826 settings = HandshakeSettings()1827 settings.minVersion = (3, 3)1828 settings.maxVersion = (3, 4)1829 v_host = VirtualHost()1830 v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)]1831 settings.virtual_hosts = [v_host]1832 connection.handshakeServer(certChain=x509ChainRSANonCA,1833 privateKey=x509KeyRSANonCA,1834 settings=settings)1835 assert connection.extendedMasterSecret1836 assert connection.session.serverCertChain == x509ChainRSANonCA1837 testConnServer(connection)1838 connection.close()1839 test_no += 11840 print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, {1}"1841 .format(test_no, tls_ver))1842 synchro.send(b'R')1843 connection = connect()1844 settings = HandshakeSettings()1845 settings.minVersion = (3, 3)1846 settings.maxVersion = (3, 4)1847 v_host = VirtualHost()1848 v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)]1849 settings.virtual_hosts = [v_host]1850 connection.handshakeServer(certChain=x509ChainRSANonCA,1851 privateKey=x509KeyRSANonCA,1852 settings=settings)1853 assert connection.extendedMasterSecret1854 assert connection.session.serverCertChain == x509ChainECDSANonCA1855 testConnServer(connection)1856 connection.close()1857 test_no += 11858 print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, {1}"1859 .format(test_no, tls_ver))1860 synchro.send(b'R')1861 connection = connect()1862 settings = HandshakeSettings()1863 settings.minVersion = (3, 3)1864 settings.maxVersion = (3, 4)1865 v_host = VirtualHost()1866 v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)]1867 settings.virtual_hosts = [v_host]1868 connection.handshakeServer(certChain=x509ChainRSANonCA,1869 privateKey=x509KeyRSANonCA,1870 settings=settings)1871 assert connection.extendedMasterSecret1872 assert connection.session.serverCertChain == x509ChainRSANonCA1873 testConnServer(connection)1874 connection.close()1875 test_no += 11876 print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no))1877 synchro.send(b'R')1878 connection = connect()1879 settings = HandshakeSettings()1880 settings.minVersion = (3, 4)1881 settings.maxVersion = (3, 4)1882 connection.handshakeServer(certChain=x509ecdsaChain,1883 privateKey=x509ecdsaKey, settings=settings)1884 assert connection.extendedMasterSecret1885 testConnServer(connection)1886 connection.close()1887 test_no += 11888 # check what happens when client doesn't advertise support for signature1889 # algoritm compatible with server key1890 print("Test {0} - mismatched ECDSA curve, TLSv1.3".format(test_no))1891 synchro.send(b'R')1892 connection = connect()1893 settings = HandshakeSettings()1894 settings.minVersion = (3, 4)1895 settings.maxVersion = (3, 4)1896 try:1897 connection.handshakeServer(certChain=x509ecdsaChain,1898 privateKey=x509ecdsaKey, settings=settings)1899 assert False1900 except TLSLocalAlert as e:1901 assert "No common signature algorithms" in str(e)1902 connection.close()1903 test_no += 11904 print("Test {0} - good X.509 P-384 ECDSA, TLSv1.3".format(test_no))1905 synchro.send(b'R')1906 connection = connect()1907 settings = HandshakeSettings()1908 settings.minVersion = (3, 4)1909 settings.maxVersion = (3, 4)1910 connection.handshakeServer(certChain=x509ecdsaP384Chain,1911 privateKey=x509ecdsaP384Key, settings=settings)1912 assert connection.extendedMasterSecret1913 testConnServer(connection)1914 connection.close()1915 test_no += 11916 print("Test {0} - good X.509 P-521 ECDSA, TLSv1.3".format(test_no))1917 synchro.send(b'R')1918 connection = connect()1919 settings = HandshakeSettings()1920 settings.minVersion = (3, 4)1921 settings.maxVersion = (3, 4)1922 connection.handshakeServer(certChain=x509ecdsaP521Chain,1923 privateKey=x509ecdsaP521Key, settings=settings)1924 assert connection.extendedMasterSecret1925 testConnServer(connection)1926 connection.close()1927 test_no += 11928 print("Test {0} - good X.509 Ed25519, TLSv1.3".format(test_no))1929 synchro.send(b'R')1930 connection = connect()1931 settings = HandshakeSettings()1932 settings.minVersion = (3, 4)1933 settings.maxVersion = (3, 4)1934 connection.handshakeServer(certChain=x509Ed25519Chain,1935 privateKey=x509Ed25519Key, settings=settings)1936 assert connection.extendedMasterSecret1937 testConnServer(connection)1938 connection.close()1939 test_no += 11940 print("Test {0} - good X.509 Ed448, TLSv1.3".format(test_no))1941 synchro.send(b'R')1942 connection = connect()1943 settings = HandshakeSettings()1944 settings.minVersion = (3, 4)1945 settings.maxVersion = (3, 4)1946 connection.handshakeServer(certChain=x509Ed448Chain,1947 privateKey=x509Ed448Key, settings=settings)1948 assert connection.extendedMasterSecret1949 testConnServer(connection)1950 connection.close()1951 test_no += 11952 for prot in ["TLSv1.3", "TLSv1.2"]:1953 for c_type, exp_chain in (("rsa", x509Chain),1954 ("ecdsa", x509ecdsaChain)):1955 print("Test {0} - good RSA and ECDSA, {2}, {1}"1956 .format(test_no, c_type, prot))1957 synchro.send(b'R')1958 connection = connect()1959 settings = HandshakeSettings()1960 settings.minVersion = (3, 3)1961 settings.maxVersion = (3, 4)1962 v_host = VirtualHost()1963 v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)]1964 settings.virtual_hosts = [v_host]1965 connection.handshakeServer(certChain=x509Chain,1966 privateKey=x509Key, settings=settings)1967 assert connection.extendedMasterSecret1968 assert connection.session.serverCertChain == exp_chain1969 testConnServer(connection)1970 connection.close()1971 test_no += 11972 print("Test {0} - good X.509, mismatched key_share".format(test_no))1973 synchro.send(b'R')1974 connection = connect()1975 settings = HandshakeSettings()1976 settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1"]1977 settings.keyShares = ["secp256r1"]1978 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)1979 testConnServer(connection)1980 connection.close()1981 test_no += 11982 print("Test {0} - good X.509, RC4-MD5".format(test_no))1983 synchro.send(b'R')1984 connection = connect()1985 settings = HandshakeSettings()1986 settings.macNames = ["sha", "md5"]1987 settings.cipherNames = ["rc4"]1988 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings)1989 testConnServer(connection)1990 connection.close()1991 if tackpyLoaded:1992 tack = Tack.createFromPem(1993 open(os.path.join(dir, "TACK1.pem"), "rU").read())1994 tackUnrelated = Tack.createFromPem(1995 open(os.path.join(dir, "TACKunrelated.pem"), "rU").read())1996 settings = HandshakeSettings()1997 settings.useExperimentalTackExtension = True1998 test_no += 11999 print("Test {0} - good X.509, TACK".format(test_no))2000 synchro.send(b'R')2001 connection = connect()2002 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2003 tacks=[tack], activationFlags=1, settings=settings)2004 testConnServer(connection)2005 connection.close()2006 test_no += 12007 print("Test {0} - good X.509, TACK unrelated to cert chain".\2008 format(test_no))2009 synchro.send(b'R')2010 connection = connect()2011 try:2012 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2013 tacks=[tackUnrelated], settings=settings)2014 assert False2015 except TLSRemoteAlert as alert:2016 if alert.description != AlertDescription.illegal_parameter:2017 raise2018 else:2019 test_no += 12020 print("Test {0} - good X.509, TACK...skipped (no tackpy)".\2021 format(test_no))2022 test_no += 12023 print("Test {0} - good X.509, TACK unrelated to cert chain"2024 "...skipped (no tackpy)".format(test_no))2025 test_no += 12026 print("Test {0} - good PSK".format(test_no))2027 synchro.send(b'R')2028 settings = HandshakeSettings()2029 settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]2030 connection = connect()2031 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2032 settings=settings)2033 testConnServer(connection)2034 connection.close()2035 test_no += 12036 print("Test {0} - good PSK, no DH".format(test_no))2037 synchro.send(b'R')2038 settings = HandshakeSettings()2039 settings.psk_modes = ["psk_ke"]2040 settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]2041 connection = connect()2042 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2043 settings=settings)2044 testConnServer(connection)2045 connection.close()2046 test_no += 12047 print("Test {0} - good PSK, no DH, no cert".format(test_no))2048 synchro.send(b'R')2049 settings = HandshakeSettings()2050 settings.psk_modes = ["psk_ke"]2051 settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')]2052 connection = connect()2053 connection.handshakeServer(settings=settings)2054 testConnServer(connection)2055 connection.close()2056 test_no += 12057 print("Test {0} - good SRP (db)".format(test_no))2058 try:2059 import logging2060 logging.basicConfig(level=logging.DEBUG)2061 (db_file, db_name) = mkstemp()2062 print("server {0} - tmp file created".format(time.time()))2063 os.close(db_file)2064 print("server {0} - tmp file closed".format(time.time()))2065 # this is race'y but the interface dbm interface is stupid like that...2066 os.remove(db_name)2067 print("server {0} - tmp file removed".format(time.time()))2068 verifierDB = VerifierDB(db_name)2069 print("server {0} - verifier initialised".format(time.time()))2070 verifierDB.create()2071 print("server {0} - verifier created".format(time.time()))2072 entry = VerifierDB.makeVerifier("test", "password", 1536)2073 print("server {0} - entry created".format(time.time()))2074 verifierDB[b"test"] = entry2075 print("server {0} - entry added".format(time.time()))2076 synchro.send(b'R')2077 print("server {0} - synchro sent".format(time.time()))2078 connection = connect()2079 connection.handshakeServer(verifierDB=verifierDB)2080 testConnServer(connection)2081 connection.close()2082 finally:2083 try:2084 os.remove(db_name)2085 except FileNotFoundError:2086 # dbm module may create files with different names depending on2087 # platform2088 os.remove(db_name + ".dat")2089 test_no += 12090 print("Test {0} - good SRP".format(test_no))2091 verifierDB = VerifierDB()2092 verifierDB.create()2093 entry = VerifierDB.makeVerifier("test", "password", 1536)2094 verifierDB[b"test"] = entry2095 synchro.send(b'R')2096 connection = connect()2097 connection.handshakeServer(verifierDB=verifierDB)2098 testConnServer(connection)2099 connection.close()2100 test_no += 12101 print("Test {0} - SRP faults".format(test_no))2102 for fault in Fault.clientSrpFaults + Fault.genericFaults:2103 synchro.send(b'R')2104 connection = connect()2105 connection.fault = fault2106 connection.handshakeServer(verifierDB=verifierDB)2107 connection.close()2108 test_no += 12109 print("Test {0} - good SRP: with X.509 certificate, TLSv1.0".format(test_no))2110 synchro.send(b'R')2111 connection = connect()2112 connection.handshakeServer(verifierDB=verifierDB, \2113 certChain=x509Chain, privateKey=x509Key)2114 testConnServer(connection) 2115 connection.close()2116 test_no += 12117 print("Test {0} - X.509 with SRP faults".format(test_no))2118 for fault in Fault.clientSrpFaults + Fault.genericFaults:2119 synchro.send(b'R')2120 connection = connect()2121 connection.fault = fault2122 connection.handshakeServer(verifierDB=verifierDB, \2123 certChain=x509Chain, privateKey=x509Key)2124 connection.close()2125 test_no += 12126 print("Test {0} - X.509 faults".format(test_no))2127 for fault in Fault.clientNoAuthFaults + Fault.genericFaults:2128 synchro.send(b'R')2129 connection = connect()2130 connection.fault = fault2131 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)2132 connection.close()2133 test_no += 12134 print("Test {0} - good mutual X.509".format(test_no))2135 synchro.send(b'R')2136 connection = connect()2137 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)2138 testConnServer(connection)2139 assert(isinstance(connection.session.clientCertChain, X509CertChain))2140 connection.close()2141 test_no += 12142 print("Test {0} - good mutual ECDSA X.509".format(test_no))2143 synchro.send(b'R')2144 connection = connect()2145 connection.handshakeServer(certChain=x509ecdsaChain,2146 privateKey=x509ecdsaKey, reqCert=True)2147 testConnServer(connection)2148 assert(isinstance(connection.session.clientCertChain, X509CertChain))2149 assert len(connection.session.clientCertChain.getEndEntityPublicKey()) ==\2150 2562151 connection.close()2152 test_no += 12153 print("Test {0} - good mutual Ed25519 X.509".format(test_no))2154 synchro.send(b'R')2155 connection = connect()2156 connection.handshakeServer(certChain=x509Ed25519Chain,2157 privateKey=x509Ed25519Key, reqCert=True)2158 testConnServer(connection)2159 assert(isinstance(connection.session.clientCertChain, X509CertChain))2160 assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\2161 == "Ed25519"2162 connection.close()2163 test_no += 12164 print("Test {0} - good mutual Ed25519 X.509, TLS 1.2".format(test_no))2165 synchro.send(b'R')2166 connection = connect()2167 settings = HandshakeSettings()2168 settings.minVersion = (3, 3)2169 settings.maxVersion = (3, 3)2170 connection.handshakeServer(certChain=x509Ed25519Chain,2171 privateKey=x509Ed25519Key, reqCert=True,2172 settings=settings)2173 testConnServer(connection)2174 assert(isinstance(connection.session.clientCertChain, X509CertChain))2175 assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\2176 == "Ed25519"2177 connection.close()2178 test_no += 12179 print("Test {0} - good X.509 DSA, SSLv3".format(test_no))2180 synchro.send(b'R')2181 connection = connect()2182 settings = HandshakeSettings()2183 settings.minVersion = (3, 0)2184 settings.maxVersion = (3, 0)2185 connection.handshakeServer(certChain=x509ChainDSA,2186 privateKey=x509KeyDSA, settings=settings)2187 assert not connection.extendedMasterSecret2188 testConnServer(connection)2189 connection.close()2190 test_no += 12191 print("Test {0} - good X.509 DSA, TLSv1.2".format(test_no))2192 synchro.send(b'R')2193 connection = connect()2194 settings = HandshakeSettings()2195 settings.minVersion = (3, 3)2196 settings.maxVersion = (3, 3)2197 connection.handshakeServer(certChain=x509ChainDSA,2198 privateKey=x509KeyDSA, settings=settings)2199 testConnServer(connection)2200 connection.close()2201 test_no += 12202 print("Test {0} - good X.509 Ed25519, TLSv1.2".format(test_no))2203 synchro.send(b'R')2204 connection = connect()2205 settings = HandshakeSettings()2206 settings.minVersion = (3, 3)2207 settings.maxVersion = (3, 3)2208 connection.handshakeServer(certChain=x509Ed25519Chain,2209 privateKey=x509Ed25519Key, settings=settings)2210 testConnServer(connection)2211 connection.close()2212 test_no += 12213 print("Test {0} - good X.509 Ed448, TLSv1.2".format(test_no))2214 synchro.send(b'R')2215 connection = connect()2216 settings = HandshakeSettings()2217 settings.minVersion = (3, 3)2218 settings.maxVersion = (3, 3)2219 connection.handshakeServer(certChain=x509Ed448Chain,2220 privateKey=x509Ed448Key, settings=settings)2221 testConnServer(connection)2222 connection.close()2223 test_no += 12224 print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no))2225 synchro.send(b'R')2226 connection = connect()2227 settings = HandshakeSettings()2228 settings.minVersion = (3,4)2229 settings.maxVersion = (3,4)2230 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)2231 testConnServer(connection)2232 assert not connection.session.clientCertChain2233 connection.close()2234 test_no += 12235 print("Test {0} - good mutual X.509, TLSv1.3".format(test_no))2236 synchro.send(b'R')2237 connection = connect()2238 settings = HandshakeSettings()2239 settings.minVersion = (3,4)2240 settings.maxVersion = (3,4)2241 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)2242 testConnServer(connection)2243 assert isinstance(connection.session.clientCertChain, X509CertChain)2244 connection.close()2245 test_no += 12246 print("Test {0} - good mutual X.509, PHA, TLSv1.3".format(test_no))2247 synchro.send(b'R')2248 connection = connect()2249 settings = HandshakeSettings()2250 settings.minVersion = (3, 4)2251 settings.maxVersion = (3, 4)2252 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2253 settings=settings)2254 assert connection.session.clientCertChain is None2255 for result in connection.request_post_handshake_auth(settings):2256 assert result in (0, 1)2257 synchro.send(b'R')2258 testConnServer(connection)2259 assert connection.session.clientCertChain is not None2260 assert isinstance(connection.session.clientCertChain, X509CertChain)2261 connection.close()2262 test_no += 12263 print("Test {0} - good mutual X.509 Ed25519, PHA, TLSv1.3".format(test_no))2264 synchro.send(b'R')2265 connection = connect()2266 settings = HandshakeSettings()2267 settings.minVersion = (3, 4)2268 settings.maxVersion = (3, 4)2269 connection.handshakeServer(certChain=x509Ed25519Chain,2270 privateKey=x509Ed25519Key,2271 settings=settings)2272 assert connection.session.clientCertChain is None2273 for result in connection.request_post_handshake_auth(settings):2274 assert result in (0, 1)2275 synchro.send(b'R')2276 testConnServer(connection)2277 assert connection.session.clientCertChain is not None2278 assert isinstance(connection.session.clientCertChain, X509CertChain)2279 assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\2280 == "Ed25519"2281 connection.close()2282 test_no += 12283 print("Test {0} - good mutual X.509, PHA and KeyUpdate, TLSv1.3".format(test_no))2284 synchro.send(b'R')2285 connection = connect()2286 settings = HandshakeSettings()2287 settings.minVersion = (3, 4)2288 settings.maxVersion = (3, 4)2289 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2290 settings=settings)2291 assert connection.session.clientCertChain is None2292 for result in connection.request_post_handshake_auth(settings):2293 assert result in (0, 1)2294 synchro.send(b'R')2295 assert connection.read(0, 0) == b''2296 assert connection.session.clientCertChain is not None2297 assert isinstance(connection.session.clientCertChain, X509CertChain)2298 testConnServer(connection)2299 connection.close()2300 test_no += 12301 print("Test {0} - mutual X.509, PHA, no client cert, TLSv1.3".format(test_no))2302 synchro.send(b'R')2303 connection = connect()2304 settings = HandshakeSettings()2305 settings.minVersion = (3, 4)2306 settings.maxVersion = (3, 4)2307 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2308 settings=settings)2309 connection.client_cert_required = True2310 assert connection.session.clientCertChain is None2311 for result in connection.request_post_handshake_auth(settings):2312 assert result in (0, 1)2313 synchro.send(b'R')2314 try:2315 testConnServer(connection)2316 assert False2317 except TLSLocalAlert as e:2318 assert "Client did not provide a certificate in post-handshake" in \2319 str(e)2320 assert e.description == AlertDescription.certificate_required2321 assert connection.session.clientCertChain is None2322 connection.close()2323 test_no += 12324 print("Test {0} - good mutual X.509, TLSv1.1".format(test_no))2325 synchro.send(b'R')2326 connection = connect()2327 settings = HandshakeSettings()2328 settings.minVersion = (3,2)2329 settings.maxVersion = (3,2)2330 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)2331 testConnServer(connection)2332 assert(isinstance(connection.session.clientCertChain, X509CertChain))2333 connection.close()2334 test_no += 12335 print("Test {0} - good mutual X.509, SSLv3".format(test_no))2336 synchro.send(b'R')2337 connection = connect()2338 settings = HandshakeSettings()2339 settings.minVersion = (3,0)2340 settings.maxVersion = (3,0)2341 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings)2342 testConnServer(connection)2343 assert(isinstance(connection.session.clientCertChain, X509CertChain))2344 connection.close()2345 test_no += 12346 print("Test {0} - mutual X.509 faults".format(test_no))2347 for fault in Fault.clientCertFaults + Fault.genericFaults:2348 synchro.send(b'R')2349 connection = connect()2350 connection.fault = fault2351 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True)2352 connection.close()2353 test_no += 12354 print("Test {0} - good SRP, prepare to resume".format(test_no))2355 synchro.send(b'R')2356 sessionCache = SessionCache()2357 connection = connect()2358 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)2359 assert(connection.session.serverName == address[0]) 2360 testConnServer(connection)2361 connection.close()2362 test_no += 12363 print("Test {0} - resumption (plus SNI)".format(test_no))2364 synchro.send(b'R')2365 connection = connect()2366 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)2367 assert(connection.session.serverName == address[0])2368 testConnServer(connection) 2369 #Don't close! -- see next test2370 test_no += 12371 print("Test {0} - invalidated resumption (plus SNI)".format(test_no))2372 synchro.send(b'R')2373 try:2374 connection.read(min=1, max=1)2375 assert False #Client is going to close the socket without a close_notify2376 except TLSAbruptCloseError as e:2377 pass2378 synchro.send(b'R')2379 connection = connect()2380 try:2381 connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache)2382 assert False2383 except TLSLocalAlert as alert:2384 if alert.description != AlertDescription.bad_record_mac:2385 raise2386 connection.close()2387 test_no += 12388 print("Test {0} - HTTPS test X.509".format(test_no))2389 #Close the current listening socket2390 lsock.close()2391 #Create and run an HTTP Server using TLSSocketServerMixIn2392 class MyHTTPServer(TLSSocketServerMixIn,2393 HTTPServer):2394 def handshake(self, tlsConnection):2395 tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key)2396 return True2397 def server_bind(self):2398 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)2399 HTTPServer.server_bind(self)2400 cd = os.getcwd()2401 os.chdir(dir)2402 address = address[0], address[1]+12403 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler)2404 for x in range(6):2405 synchro.send(b'R')2406 httpd.handle_request()2407 httpd.server_close()2408 cd = os.chdir(cd)2409 #Re-connect the listening socket2410 lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)2411 lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)2412 address = address[0], address[1]+12413 lsock.bind(address)2414 lsock.listen(5)2415 implementations = []2416 if m2cryptoLoaded:2417 implementations.append("openssl")2418 if pycryptoLoaded:2419 implementations.append("pycrypto")2420 implementations.append("python")2421 test_no += 12422 print("Test {0} - different ciphers, TLSv1.0".format(test_no))2423 for implementation in ["python"] * len(implementations):2424 for cipher in ["aes128", "aes256", "rc4"]:2425 test_no += 12426 print("Test {0}:".format(test_no), end=' ')2427 synchro.send(b'R')2428 connection = connect()2429 settings = HandshakeSettings()2430 settings.cipherNames = [cipher]2431 settings.cipherImplementations = [implementation, "python"]2432 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2433 settings=settings)2434 print(connection.getCipherName(), connection.getCipherImplementation())2435 testConnServer(connection)2436 connection.close()2437 test_no += 12438 print("Test {0} - throughput test".format(test_no))2439 for implementation in implementations:2440 for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8",2441 "aes128gcm", "aes256gcm", "aes128", "aes256", "3des",2442 "rc4", "chacha20-poly1305_draft00",2443 "chacha20-poly1305"]:2444 # skip tests with implementations that don't support them2445 if cipher == "3des" and implementation not in ("openssl",2446 "pycrypto"):2447 continue2448 if cipher in ("aes128gcm", "aes256gcm") and \2449 implementation not in ("pycrypto",2450 "python", "openssl"):2451 continue2452 if cipher in ("aes128ccm", "aes128ccm_8",2453 "aes256ccm", "aes256ccm_8") and \2454 implementation not in ("python", "openssl"):2455 continue2456 if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \2457 and implementation not in ("python", ):2458 continue2459 test_no += 12460 print("Test {0}:".format(test_no), end=' ')2461 synchro.send(b'R')2462 connection = connect()2463 settings = HandshakeSettings()2464 settings.cipherNames = [cipher]2465 settings.cipherImplementations = [implementation, "python"]2466 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2467 settings=settings)2468 print(connection.getCipherName(), connection.getCipherImplementation())2469 h = connection.read(min=50000, max=50000)2470 assert(h == b"hello"*10000)2471 connection.write(h)2472 connection.close()2473 test_no += 12474 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2475 synchro.send(b'R')2476 connection = connect()2477 settings = HandshakeSettings()2478 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2479 settings=settings, nextProtos=[b"http/1.1"])2480 testConnServer(connection)2481 connection.close()2482 test_no += 12483 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2484 synchro.send(b'R')2485 connection = connect()2486 settings = HandshakeSettings()2487 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2488 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])2489 testConnServer(connection)2490 connection.close()2491 test_no += 12492 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2493 synchro.send(b'R')2494 connection = connect()2495 settings = HandshakeSettings()2496 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2497 settings=settings, nextProtos=[b"http/1.1", b"spdy/2"])2498 testConnServer(connection)2499 connection.close()2500 test_no += 12501 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2502 synchro.send(b'R')2503 connection = connect()2504 settings = HandshakeSettings()2505 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2506 settings=settings, nextProtos=[b"spdy/2", b"http/1.1"])2507 testConnServer(connection)2508 connection.close()2509 test_no += 12510 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2511 synchro.send(b'R')2512 connection = connect()2513 settings = HandshakeSettings()2514 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2515 settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"])2516 testConnServer(connection)2517 connection.close()2518 test_no += 12519 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2520 synchro.send(b'R')2521 connection = connect()2522 settings = HandshakeSettings()2523 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2524 settings=settings, nextProtos=[b"spdy/3", b"spdy/2"])2525 testConnServer(connection)2526 connection.close()2527 test_no += 12528 print("Test {0} - Next-Protocol Server Negotiation".format(test_no))2529 synchro.send(b'R')2530 connection = connect()2531 settings = HandshakeSettings()2532 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, 2533 settings=settings, nextProtos=[])2534 testConnServer(connection)2535 connection.close()2536 test_no += 12537 print("Test {0} - FALLBACK_SCSV".format(test_no))2538 synchro.send(b'R')2539 connection = connect()2540 settings = HandshakeSettings()2541 settings.maxVersion = (3, 3)2542 # TODO fix FALLBACK with TLS1.32543 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2544 settings=settings)2545 testConnServer(connection)2546 connection.close()2547 test_no += 12548 print("Test {0} - FALLBACK_SCSV".format(test_no))2549 synchro.send(b'R')2550 connection = connect()2551 settings = HandshakeSettings()2552 # TODO fix FALLBACK with TLS1.32553 settings.maxVersion = (3, 3)2554 try:2555 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2556 settings=settings)2557 assert False2558 except TLSLocalAlert as alert:2559 if alert.description != AlertDescription.inappropriate_fallback:2560 raise2561 connection.close()2562 test_no += 12563 print("Test {0} - no EtM server side".format(test_no))2564 synchro.send(b'R')2565 connection = connect()2566 settings = HandshakeSettings()2567 settings.useEncryptThenMAC = False2568 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2569 settings=settings)2570 testConnServer(connection)2571 connection.close()2572 test_no += 12573 print("Test {0} - no EtM client side".format(test_no))2574 synchro.send(b'R')2575 connection = connect()2576 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key)2577 testConnServer(connection)2578 connection.close()2579 test_no += 12580 print("Test {0} - resumption with EtM".format(test_no))2581 synchro.send(b'R')2582 sessionCache = SessionCache()2583 connection = connect()2584 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2585 sessionCache=sessionCache)2586 testConnServer(connection)2587 connection.close()2588 # resume2589 synchro.send(b'R')2590 connection = connect()2591 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2592 sessionCache=sessionCache)2593 testConnServer(connection)2594 connection.close()2595 test_no += 12596 print("Test {0} - resumption with no EtM in 2nd handshake".format(test_no))2597 synchro.send(b'R')2598 sessionCache = SessionCache()2599 connection = connect()2600 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2601 sessionCache=sessionCache)2602 testConnServer(connection)2603 connection.close()2604 # resume2605 synchro.send(b'R')2606 connection = connect()2607 try:2608 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2609 sessionCache=sessionCache)2610 assert False2611 except TLSLocalAlert as e:2612 assert(str(e) == "illegal_parameter")2613 else:2614 raise AssertionError("no exception raised")2615 connection.close()2616 test_no += 12617 print("Test {0} - resumption in TLSv1.3".format(test_no))2618 synchro.send(b'R')2619 connection = connect()2620 settings = HandshakeSettings()2621 settings.minVersion = (3,4)2622 settings.ticketKeys = [getRandomBytes(32)]2623 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2624 settings=settings)2625 testConnServer(connection)2626 connection.close()2627 # resume2628 synchro.send(b'R')2629 connection = connect()2630 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2631 settings=settings)2632 testConnServer(connection)2633 connection.close()2634 test_no += 12635 print("Test {0} - resumption in TLSv1.3 with mutual X.509".format(test_no))2636 synchro.send(b'R')2637 connection = connect()2638 settings = HandshakeSettings()2639 settings.minVersion = (3,4)2640 settings.ticketKeys = [getRandomBytes(32)]2641 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2642 reqCert=True, settings=settings)2643 testConnServer(connection)2644 connection.close()2645 # resume2646 synchro.send(b'R')2647 connection = connect()2648 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2649 reqCert=True, settings=settings)2650 testConnServer(connection)2651 assert connection.session.clientCertChain2652 connection.close()2653 test_no += 12654 print("Test {0} - resumption in TLSv1.3 with AES-CCM tickets".format(test_no))2655 synchro.send(b'R')2656 connection = connect()2657 settings = HandshakeSettings()2658 settings.minVersion = (3, 4)2659 settings.ticketKeys = [getRandomBytes(32)]2660 settings.ticketCipher = "aes128ccm"2661 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2662 settings=settings)2663 testConnServer(connection)2664 connection.close()2665 # resume2666 synchro.send(b'R')2667 connection = connect()2668 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2669 settings=settings)2670 testConnServer(connection)2671 connection.close()2672 test_no += 12673 print("Test {0} - Heartbeat extension response callback in TLSv1.2".format(test_no))2674 heartbeat_payload = os.urandom(50)2675 def heartbeat_response_check(message):2676 global received_payload2677 received_payload = message.payload2678 synchro.send(b'R')2679 connection = connect()2680 settings = HandshakeSettings()2681 settings.maxVersion = (3, 3)2682 settings.heartbeat_response_callback = heartbeat_response_check2683 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2684 settings=settings)2685 connection.send_heartbeat_request(heartbeat_payload, 16)2686 testConnServer(connection)2687 testConnServer(connection)2688 connection.close()2689 assert heartbeat_payload == received_payload2690 test_no += 12691 print("Test {0} - Heartbeat extension in TLSv1.3".format(test_no))2692 heartbeat_payload = os.urandom(50)2693 def heartbeat_response_check(message):2694 global received_payload2695 received_payload = message.payload2696 synchro.send(b'R')2697 connection = connect()2698 settings = HandshakeSettings()2699 settings.maxVersion = (3, 4)2700 settings.heartbeat_response_callback = heartbeat_response_check2701 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2702 settings=settings)2703 connection.send_heartbeat_request(heartbeat_payload, 16)2704 testConnServer(connection)2705 testConnServer(connection)2706 connection.close()2707 assert heartbeat_payload == received_payload2708 test_no += 12709 print("Test {0} - KeyUpdate from client in TLSv1.3".format(test_no))2710 synchro.send(b'R')2711 connection = connect()2712 settings = HandshakeSettings()2713 settings.maxVersion = (3, 4)2714 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2715 settings=settings)2716 synchro.send(b'K')2717 synchro.send(b'K')2718 testConnServer(connection)2719 connection.close()2720 test_no += 12721 print("Test {0} - mutual KeyUpdates in TLSv1.3".format(test_no))2722 synchro.send(b'R')2723 connection = connect()2724 settings = HandshakeSettings()2725 settings.maxVersion = (3, 4)2726 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2727 settings=settings)2728 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):2729 assert i in (0, 1)2730 testConnServer(connection)2731 assert synchro.recv(1) == b'R'2732 connection.close()2733 test_no += 12734 print("Test {0} - multiple mutual KeyUpdates in TLSv1.3".format(test_no))2735 synchro.send(b'R')2736 connection = connect()2737 settings = HandshakeSettings()2738 settings.maxVersion = (3, 4)2739 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key,2740 settings=settings)2741 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):2742 assert i in (0, 1)2743 for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested):2744 assert i in (0, 1)2745 testConnServer(connection)2746 assert synchro.recv(1) == b'R'2747 connection.close()2748 test_no += 12749 print("Tests {0}-{1} - XMLRPXC server".format(test_no, test_no + 2))2750 address = address[0], address[1]+12751 class Server(TLSXMLRPCServer):2752 def handshake(self, tlsConnection):2753 try:2754 tlsConnection.handshakeServer(certChain=x509Chain,2755 privateKey=x509Key,2756 sessionCache=sessionCache)2757 tlsConnection.ignoreAbruptClose = True2758 return True2759 except TLSError as error:2760 print("Handshake failure:", str(error))2761 return False2762 class MyFuncs:2763 def pow(self, x, y): return pow(x, y)2764 def add(self, x, y): return x + y2765 server = Server(address)2766 server.register_instance(MyFuncs())2767 synchro.send(b'R')2768 #sa = server.socket.getsockname()2769 #print "Serving HTTPS on", sa[0], "port", sa[1]2770 for i in range(6):2771 synchro.send(b'R')2772 server.handle_request()2773 synchro.close()2774 synchroSocket.close()2775 test_no += 22776 print("Test succeeded")2777if __name__ == '__main__':2778 if len(sys.argv) < 2:2779 printUsage("Missing command")2780 elif sys.argv[1] == "client"[:len(sys.argv[1])]:2781 clientTestCmd(sys.argv[2:])2782 elif sys.argv[1] == "server"[:len(sys.argv[1])]:2783 serverTestCmd(sys.argv[2:])2784 else:...
test_binary_converter.py
Source:test_binary_converter.py
1#!/usr/bin/env python32"""Module documentation goes here3"""4import subprocess5import sys6class QuickTest:7 """Class documentation goes here8 """9 def __init__(self):10 self._app = "./binary_converter"11 self._input = ["0 0", "33 0", "8 -31", "4 5", "8 37"]12 self._output = [(1, ), (1, ), (2, ), (0, "0101"), (0, "00100101")]13 def test(self, test_no):14 """Method documentation goes here15 """16 returncode, out = self.run(test_no)17 out = str.strip(out) if out else None18 if self._output[test_no][0] == returncode:19 print("RETURN CODE CORRECT")20 else:21 print(22 "ERROR: EXPECTED return {}, ACTUAL return {}.".format(23 self._output[test_no][0], returncode), file=sys.stderr)24 if not out and len(self._output[test_no]) > 1:25 print("ERROR: No output from student app.", file=sys.stderr)26 return27 sys.exit(1)28 if len(self._output[test_no]) == 1 or self._output[test_no] == 2:29 # bad input size or negative decimal value30 return31 sys.exit(0)32 if self._output[test_no][1] == out:33 print("OUTPUT CORRECT")34 else:35 print("ERROR: EXPECTED {}, ACTUAL {}".format(36 self._output[test_no][1], out, file=sys.stderr))37 def run(self, test_no):38 """Method documentation goes here.39 """40 with subprocess.Popen(self._app, stderr=subprocess.PIPE,41 stdin=subprocess.PIPE, stdout=subprocess.PIPE) as proc:42 out, err = proc.communicate(43 input=self._input[test_no].encode("utf-8"))44 45 try:46 return proc.returncode, out.decode("utf-8") if out else None47 except UnicodeDecodeError as e:48 return proc.returncode, "Serious execution badness:" + format(e)49if __name__ == "__main__":50 test = QuickTest()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!