Best Python code snippet using Testify_python
testlib.py
Source:testlib.py
1import collections2import os.path3import random4import re5import shlex6import subprocess7import sys8import tempfile9import time10import traceback11import pexpect12# Note that gdb comes with its own testsuite. I was unable to figure out how to13# run that testsuite against the spike simulator.14def find_file(path):15 for directory in (os.getcwd(), os.path.dirname(__file__)):16 fullpath = os.path.join(directory, path)17 if os.path.exists(fullpath):18 return fullpath19 return None20def compile(args, xlen=32): # pylint: disable=redefined-builtin21 cc = os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gcc")22 cmd = [cc, "-g"]23 if xlen == 32:24 cmd.append("-march=rv32imac")25 cmd.append("-mabi=ilp32")26 else:27 cmd.append("-march=rv64imac")28 cmd.append("-mabi=lp64")29 for arg in args:30 found = find_file(arg)31 if found:32 cmd.append(found)33 else:34 cmd.append(arg)35 process = subprocess.Popen(cmd, stdout=subprocess.PIPE,36 stderr=subprocess.PIPE)37 stdout, stderr = process.communicate()38 if process.returncode:39 print40 header("Compile failed")41 print "+", " ".join(cmd)42 print stdout,43 print stderr,44 header("")45 raise Exception("Compile failed!")46def unused_port():47 # http://stackoverflow.com/questions/2838244/get-open-tcp-port-in-python/2838309#283830948 import socket49 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)50 s.bind(("", 0))51 port = s.getsockname()[1]52 s.close()53 return port54class Spike(object):55 logname = "spike-%d.log" % os.getpid()56 def __init__(self, target, halted=False, timeout=None, with_jtag_gdb=True):57 """Launch spike. Return tuple of its process and the port it's running58 on."""59 if target.sim_cmd:60 cmd = shlex.split(target.sim_cmd)61 else:62 spike = os.path.expandvars("$RISCV/bin/spike")63 cmd = [spike]64 if target.xlen == 32:65 cmd += ["--isa", "RV32G"]66 else:67 cmd += ["--isa", "RV64G"]68 cmd += ["-m0x%x:0x%x" % (target.ram, target.ram_size)]69 if timeout:70 cmd = ["timeout", str(timeout)] + cmd71 if halted:72 cmd.append('-H')73 if with_jtag_gdb:74 cmd += ['--rbb-port', '0']75 os.environ['REMOTE_BITBANG_HOST'] = 'localhost'76 self.infinite_loop = target.compile(77 "programs/checksum.c", "programs/tiny-malloc.c",78 "programs/infinite_loop.S", "-DDEFINE_MALLOC", "-DDEFINE_FREE")79 cmd.append(self.infinite_loop)80 logfile = open(self.logname, "w")81 logfile.write("+ %s\n" % " ".join(cmd))82 logfile.flush()83 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,84 stdout=logfile, stderr=logfile)85 if with_jtag_gdb:86 self.port = None87 for _ in range(30):88 m = re.search(r"Listening for remote bitbang connection on "89 r"port (\d+).", open(self.logname).read())90 if m:91 self.port = int(m.group(1))92 os.environ['REMOTE_BITBANG_PORT'] = m.group(1)93 break94 time.sleep(0.11)95 assert self.port, "Didn't get spike message about bitbang " \96 "connection"97 def __del__(self):98 try:99 self.process.kill()100 self.process.wait()101 except OSError:102 pass103 def wait(self, *args, **kwargs):104 return self.process.wait(*args, **kwargs)105class VcsSim(object):106 def __init__(self, sim_cmd=None, debug=False):107 if sim_cmd:108 cmd = shlex.split(sim_cmd)109 else:110 cmd = ["simv"]111 cmd += ["+jtag_vpi_enable"]112 if debug:113 cmd[0] = cmd[0] + "-debug"114 cmd += ["+vcdplusfile=output/gdbserver.vpd"]115 logfile = open("simv.log", "w")116 logfile.write("+ %s\n" % " ".join(cmd))117 logfile.flush()118 listenfile = open("simv.log", "r")119 listenfile.seek(0, 2)120 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,121 stdout=logfile, stderr=logfile)122 done = False123 while not done:124 # Fail if VCS exits early125 exit_code = self.process.poll()126 if exit_code is not None:127 raise RuntimeError('VCS simulator exited early with status %d'128 % exit_code)129 line = listenfile.readline()130 if not line:131 time.sleep(1)132 match = re.match(r"^Listening on port (\d+)$", line)133 if match:134 done = True135 self.port = int(match.group(1))136 os.environ['JTAG_VPI_PORT'] = str(self.port)137 def __del__(self):138 try:139 self.process.kill()140 self.process.wait()141 except OSError:142 pass143class Openocd(object):144 logfile = tempfile.NamedTemporaryFile(prefix='openocd', suffix='.log')145 logname = logfile.name146 def __init__(self, server_cmd=None, config=None, debug=False):147 if server_cmd:148 cmd = shlex.split(server_cmd)149 else:150 openocd = os.path.expandvars("$RISCV/bin/openocd")151 cmd = [openocd]152 if debug:153 cmd.append("-d")154 # This command needs to come before any config scripts on the command155 # line, since they are executed in order.156 cmd += [157 # Tell OpenOCD to bind gdb to an unused, ephemeral port.158 "--command",159 "gdb_port 0",160 # Disable tcl and telnet servers, since they are unused and because161 # the port numbers will conflict if multiple OpenOCD processes are162 # running on the same server.163 "--command",164 "tcl_port disabled",165 "--command",166 "telnet_port disabled",167 ]168 if config:169 f = find_file(config)170 if f is None:171 print "Unable to read file " + config172 exit(1)173 cmd += ["-f", f]174 if debug:175 cmd.append("-d")176 logfile = open(Openocd.logname, "w")177 logfile.write("+ %s\n" % " ".join(cmd))178 logfile.flush()179 self.process = subprocess.Popen(cmd, stdin=subprocess.PIPE,180 stdout=logfile, stderr=logfile)181 # Wait for OpenOCD to have made it through riscv_examine(). When using182 # OpenOCD to communicate with a simulator this may take a long time,183 # and gdb will time out when trying to connect if we attempt too early.184 start = time.time()185 messaged = False186 while True:187 log = open(Openocd.logname).read()188 if "Ready for Remote Connections" in log:189 break190 if not self.process.poll() is None:191 header("OpenOCD log")192 sys.stdout.write(log)193 raise Exception(194 "OpenOCD exited before completing riscv_examine()")195 if not messaged and time.time() - start > 1:196 messaged = True197 print "Waiting for OpenOCD to examine RISCV core..."198 if time.time() - start > 60:199 raise Exception("ERROR: Timed out waiting for OpenOCD to "200 "examine RISCV core")201 try:202 self.port = self._get_gdb_server_port()203 except:204 header("OpenOCD log")205 sys.stdout.write(log)206 raise207 def _get_gdb_server_port(self):208 """Get port that OpenOCD's gdb server is listening on."""209 MAX_ATTEMPTS = 50210 PORT_REGEX = re.compile(r'(?P<port>\d+) \(LISTEN\)')211 for _ in range(MAX_ATTEMPTS):212 with open(os.devnull, 'w') as devnull:213 try:214 output = subprocess.check_output([215 'lsof',216 '-a', # Take the AND of the following selectors217 '-p{}'.format(self.process.pid), # Filter on PID218 '-iTCP', # Filter only TCP sockets219 ], stderr=devnull)220 except subprocess.CalledProcessError:221 output = ""222 matches = list(PORT_REGEX.finditer(output))223 matches = [m for m in matches224 if m.group('port') not in ('6666', '4444')]225 if len(matches) > 1:226 print output227 raise Exception(228 "OpenOCD listening on multiple ports. Cannot uniquely "229 "identify gdb server port.")230 elif matches:231 [match] = matches232 return int(match.group('port'))233 time.sleep(1)234 raise Exception("Timed out waiting for gdb server to obtain port.")235 def __del__(self):236 try:237 self.process.kill()238 self.process.wait()239 except (OSError, AttributeError):240 pass241class OpenocdCli(object):242 def __init__(self, port=4444):243 self.child = pexpect.spawn(244 "sh -c 'telnet localhost %d | tee openocd-cli.log'" % port)245 self.child.expect("> ")246 def command(self, cmd):247 self.child.sendline(cmd)248 self.child.expect(cmd)249 self.child.expect("\n")250 self.child.expect("> ")251 return self.child.before.strip("\t\r\n \0")252 def reg(self, reg=''):253 output = self.command("reg %s" % reg)254 matches = re.findall(r"(\w+) \(/\d+\): (0x[0-9A-F]+)", output)255 values = {r: int(v, 0) for r, v in matches}256 if reg:257 return values[reg]258 return values259 def load_image(self, image):260 output = self.command("load_image %s" % image)261 if 'invalid ELF file, only 32bits files are supported' in output:262 raise TestNotApplicable(output)263class CannotAccess(Exception):264 def __init__(self, address):265 Exception.__init__(self)266 self.address = address267Thread = collections.namedtuple('Thread', ('id', 'target_id', 'name',268 'frame'))269class Gdb(object):270 logfile = tempfile.NamedTemporaryFile(prefix="gdb", suffix=".log")271 logname = logfile.name272 def __init__(self,273 cmd=os.path.expandvars("$RISCV/bin/riscv64-unknown-elf-gdb")):274 self.child = pexpect.spawn(cmd)275 self.child.logfile = open(self.logname, "w")276 self.child.logfile.write("+ %s\n" % cmd)277 self.wait()278 self.command("set confirm off")279 self.command("set width 0")280 self.command("set height 0")281 # Force consistency.282 self.command("set print entry-values no")283 def wait(self):284 """Wait for prompt."""285 self.child.expect(r"\(gdb\)")286 def command(self, command, timeout=6000):287 self.child.sendline(command)288 self.child.expect("\n", timeout=timeout)289 self.child.expect(r"\(gdb\)", timeout=timeout)290 return self.child.before.strip()291 def c(self, wait=True, timeout=-1, async=False):292 if async:293 async = "&"294 else:295 async = ""296 if wait:297 output = self.command("c%s" % async, timeout=timeout)298 assert "Continuing" in output299 return output300 else:301 self.child.sendline("c%s" % async)302 self.child.expect("Continuing")303 def interrupt(self):304 self.child.send("\003")305 self.child.expect(r"\(gdb\)", timeout=6000)306 return self.child.before.strip()307 def x(self, address, size='w'):308 output = self.command("x/%s %s" % (size, address))309 value = int(output.split(':')[1].strip(), 0)310 return value311 def p_raw(self, obj):312 output = self.command("p %s" % obj)313 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)314 if m:315 raise CannotAccess(int(m.group(1), 0))316 return output.split('=')[-1].strip()317 def p(self, obj):318 output = self.command("p/x %s" % obj)319 m = re.search("Cannot access memory at address (0x[0-9a-f]+)", output)320 if m:321 raise CannotAccess(int(m.group(1), 0))322 value = int(output.split('=')[-1].strip(), 0)323 return value324 def p_string(self, obj):325 output = self.command("p %s" % obj)326 value = shlex.split(output.split('=')[-1].strip())[1]327 return value328 def stepi(self):329 output = self.command("stepi")330 return output331 def load(self):332 output = self.command("load", timeout=6000)333 assert "failed" not in output334 assert "Transfer rate" in output335 def b(self, location):336 output = self.command("b %s" % location)337 assert "not defined" not in output338 assert "Breakpoint" in output339 return output340 def hbreak(self, location):341 output = self.command("hbreak %s" % location)342 assert "not defined" not in output343 assert "Hardware assisted breakpoint" in output344 return output345 def threads(self):346 output = self.command("info threads")347 threads = []348 for line in output.splitlines():349 m = re.match(350 r"[\s\*]*(\d+)\s*Thread (\d+)\s*\(Name: ([^\)]+)\s*(.*)",351 line)352 if m:353 threads.append(Thread(*m.groups()))354 if not threads:355 threads.append(Thread('1', '1', 'Default', '???'))356 return threads357 def thread(self, thread):358 return self.command("thread %s" % thread.id)359def run_all_tests(module, target, parsed):360 if not os.path.exists(parsed.logs):361 os.makedirs(parsed.logs)362 overall_start = time.time()363 global gdb_cmd # pylint: disable=global-statement364 gdb_cmd = parsed.gdb365 todo = []366 if parsed.misaval:367 target.misa = int(parsed.misaval, 16)368 print "Using $misa from command line: 0x%x" % target.misa369 elif target.misa:370 print "Using $misa from target definition: 0x%x" % target.misa371 else:372 todo.append(("ExamineTarget", ExamineTarget))373 for name in dir(module):374 definition = getattr(module, name)375 if type(definition) == type and hasattr(definition, 'test') and \376 (not parsed.test or any(test in name for test in parsed.test)):377 todo.append((name, definition))378 results, count = run_tests(parsed, target, todo)379 header("ran %d tests in %.0fs" % (count, time.time() - overall_start),380 dash=':')381 return print_results(results)382good_results = set(('pass', 'not_applicable'))383def run_tests(parsed, target, todo):384 results = {}385 count = 0386 for name, definition in todo:387 instance = definition(target)388 log_name = os.path.join(parsed.logs, "%s-%s-%s.log" %389 (time.strftime("%Y%m%d-%H%M%S"), type(target).__name__, name))390 log_fd = open(log_name, 'w')391 print "Running", name, "...",392 sys.stdout.flush()393 log_fd.write("Test: %s\n" % name)394 log_fd.write("Target: %s\n" % type(target).__name__)395 start = time.time()396 real_stdout = sys.stdout397 sys.stdout = log_fd398 try:399 result = instance.run()400 log_fd.write("Result: %s\n" % result)401 finally:402 sys.stdout = real_stdout403 log_fd.write("Time elapsed: %.2fs\n" % (time.time() - start))404 print "%s in %.2fs" % (result, time.time() - start)405 sys.stdout.flush()406 results.setdefault(result, []).append(name)407 count += 1408 if result not in good_results and parsed.fail_fast:409 break410 return results, count411def print_results(results):412 result = 0413 for key, value in results.iteritems():414 print "%d tests returned %s" % (len(value), key)415 if key not in good_results:416 result = 1417 for test in value:418 print " ", test419 return result420def add_test_run_options(parser):421 parser.add_argument("--logs", default="logs",422 help="Store logs in the specified directory.")423 parser.add_argument("--fail-fast", "-f", action="store_true",424 help="Exit as soon as any test fails.")425 parser.add_argument("test", nargs='*',426 help="Run only tests that are named here.")427 parser.add_argument("--gdb",428 help="The command to use to start gdb.")429 parser.add_argument("--misaval",430 help="Don't run ExamineTarget, just assume the misa value which is "431 "specified.")432def header(title, dash='-', length=78):433 if title:434 dashes = dash * (length - 4 - len(title))435 before = dashes[:len(dashes)/2]436 after = dashes[len(dashes)/2:]437 print "%s[ %s ]%s" % (before, title, after)438 else:439 print dash * length440def print_log(path):441 header(path)442 lines = open(path, "r").readlines()443 for l in lines:444 sys.stdout.write(l)445 print446class BaseTest(object):447 compiled = {}448 def __init__(self, target):449 self.target = target450 self.server = None451 self.target_process = None452 self.binary = None453 self.start = 0454 self.logs = []455 def early_applicable(self):456 """Return a false value if the test has determined it cannot run457 without ever needing to talk to the target or server."""458 # pylint: disable=no-self-use459 return True460 def setup(self):461 pass462 def compile(self):463 compile_args = getattr(self, 'compile_args', None)464 if compile_args:465 if compile_args not in BaseTest.compiled:466 # pylint: disable=star-args467 BaseTest.compiled[compile_args] = \468 self.target.compile(*compile_args)469 self.binary = BaseTest.compiled.get(compile_args)470 def classSetup(self):471 self.compile()472 self.target_process = self.target.create()473 if self.target_process:474 self.logs.append(self.target_process.logname)475 try:476 self.server = self.target.server()477 self.logs.append(self.server.logname)478 except Exception:479 for log in self.logs:480 print_log(log)481 raise482 def classTeardown(self):483 del self.server484 del self.target_process485 def run(self):486 """487 If compile_args is set, compile a program and set self.binary.488 Call setup().489 Then call test() and return the result, displaying relevant information490 if an exception is raised.491 """492 sys.stdout.flush()493 if not self.early_applicable():494 return "not_applicable"495 self.start = time.time()496 try:497 self.classSetup()498 self.setup()499 result = self.test() # pylint: disable=no-member500 except TestNotApplicable:501 result = "not_applicable"502 except Exception as e: # pylint: disable=broad-except503 if isinstance(e, TestFailed):504 result = "fail"505 else:506 result = "exception"507 if isinstance(e, TestFailed):508 header("Message")509 print e.message510 header("Traceback")511 traceback.print_exc(file=sys.stdout)512 return result513 finally:514 for log in self.logs:515 print_log(log)516 header("End of logs")517 self.classTeardown()518 if not result:519 result = 'pass'520 return result521gdb_cmd = None522class GdbTest(BaseTest):523 def __init__(self, target):524 BaseTest.__init__(self, target)525 self.gdb = None526 def classSetup(self):527 BaseTest.classSetup(self)528 if gdb_cmd:529 self.gdb = Gdb(gdb_cmd)530 else:531 self.gdb = Gdb()532 self.logs.append(self.gdb.logname)533 if self.binary:534 self.gdb.command("file %s" % self.binary)535 if self.target:536 self.gdb.command("set arch riscv:rv%d" % self.target.xlen)537 self.gdb.command("set remotetimeout %d" % self.target.timeout_sec)538 if self.server.port:539 self.gdb.command(540 "target extended-remote localhost:%d" % self.server.port)541 # Select a random thread.542 # TODO: Allow a command line option to force a specific thread.543 thread = random.choice(self.gdb.threads())544 self.gdb.thread(thread)545 for cmd in self.target.gdb_setup:546 self.gdb.command(cmd)547 # FIXME: OpenOCD doesn't handle PRIV now548 #self.gdb.p("$priv=3")549 def classTeardown(self):550 del self.gdb551 BaseTest.classTeardown(self)552class ExamineTarget(GdbTest):553 def test(self):554 self.target.misa = self.gdb.p("$misa")555 txt = "RV"556 if (self.target.misa >> 30) == 1:557 txt += "32"558 elif (self.target.misa >> 62) == 2:559 txt += "64"560 elif (self.target.misa >> 126) == 3:561 txt += "128"562 else:563 raise TestFailed("Couldn't determine XLEN from $misa (0x%x)" %564 self.target.misa)565 for i in range(26):566 if self.target.misa & (1<<i):567 txt += chr(i + ord('A'))568 print txt,569class TestFailed(Exception):570 def __init__(self, message):571 Exception.__init__(self)572 self.message = message573class TestNotApplicable(Exception):574 def __init__(self, message):575 Exception.__init__(self)576 self.message = message577def assertEqual(a, b):578 if a != b:579 raise TestFailed("%r != %r" % (a, b))580def assertNotEqual(a, b):581 if a == b:582 raise TestFailed("%r == %r" % (a, b))583def assertIn(a, b):584 if a not in b:585 raise TestFailed("%r not in %r" % (a, b))586def assertNotIn(a, b):587 if a in b:588 raise TestFailed("%r in %r" % (a, b))589def assertGreater(a, b):590 if not a > b:591 raise TestFailed("%r not greater than %r" % (a, b))592def assertLess(a, b):593 if not a < b:594 raise TestFailed("%r not less than %r" % (a, b))595def assertTrue(a):596 if not a:597 raise TestFailed("%r is not True" % a)598def assertRegexpMatches(text, regexp):599 if not re.search(regexp, text):...
suite.py
Source:suite.py
1"""2Test Suites3"""4from __future__ import generators5import sys6import unittest7from nose_helper.case import Test8from nose_helper.config import Config9from nose_helper.util import isclass, resolve_name, try_run10PYTHON_VERSION_MAJOR = sys.version_info[0]11class LazySuite(unittest.TestSuite):12 """A suite that may use a generator as its list of tests13 """14 def __init__(self, tests=()):15 self._set_tests(tests)16 17 def __iter__(self):18 return iter(self._tests)19 def __hash__(self):20 return object.__hash__(self)21 def addTest(self, test):22 self._precache.append(test)23 def __nonzero__(self):24 if self._precache:25 return True26 if self.test_generator is None:27 return False28 try:29 test = self.test_generator.next()30 if test is not None:31 self._precache.append(test)32 return True33 except StopIteration:34 pass35 return False36 def _get_tests(self):37 if self.test_generator is not None:38 for i in self.test_generator:39 yield i40 for test in self._precache:41 yield test42 def _set_tests(self, tests):43 self._precache = []44 is_suite = isinstance(tests, unittest.TestSuite)45 if hasattr(tests, '__call__') and not is_suite:46 self.test_generator = tests()47 self.test_generator_counter = list(tests())48 elif is_suite:49 self.addTests([tests])50 self.test_generator = None51 self.test_generator_counter = None52 else:53 self.addTests(tests)54 self.test_generator = None55 self.test_generator_counter = None56 def countTestCases(self):57 counter = 058 generator = self.test_generator_counter59 if generator is not None:60 for test in generator:61 counter +=162 for test in self._precache:63 counter += test.countTestCases()64 return counter65 _tests = property(_get_tests, _set_tests, None,66 "Access the tests in this suite.")67class ContextSuite(LazySuite):68 """A suite with context.69 """70 was_setup = False71 was_torndown = False72 classSetup = ('setup_class', 'setup_all', 'setupClass', 'setupAll',73 'setUpClass', 'setUpAll')74 classTeardown = ('teardown_class', 'teardown_all', 'teardownClass',75 'teardownAll', 'tearDownClass', 'tearDownAll')76 moduleSetup = ('setup_module', 'setupModule', 'setUpModule', 'setup',77 'setUp')78 moduleTeardown = ('teardown_module', 'teardownModule', 'tearDownModule',79 'teardown', 'tearDown')80 packageSetup = ('setup_package', 'setupPackage', 'setUpPackage')81 packageTeardown = ('teardown_package', 'teardownPackage',82 'tearDownPackage')83 def __init__(self, tests=(), context=None, factory=None,84 config=None):85 86 self.context = context87 self.factory = factory88 if config is None:89 config = Config()90 self.config = config91 self.has_run = False92 self.error_context = None93 LazySuite.__init__(self, tests)94 def __hash__(self):95 return object.__hash__(self)96 def __call__(self, *arg, **kw):97 return self.run(*arg, **kw)98 def _exc_info(self):99 return sys.exc_info()100 def addTests(self, tests, context=None):101 if context:102 self.context = context103 if PYTHON_VERSION_MAJOR < 3 and isinstance(tests, basestring):104 raise TypeError("tests must be an iterable of tests, not a string")105 else:106 if isinstance(tests, str):107 raise TypeError("tests must be an iterable of tests, not a string")108 for test in tests:109 self.addTest(test)110 def run(self, result):111 """Run tests in suite inside of suite fixtures.112 """113 result, orig = result, result114 try:115 self.setUp()116 except KeyboardInterrupt:117 raise118 except:119 self.error_context = 'setup'120 result.addError(self, self._exc_info())121 return122 try:123 for test in self._tests:124 if result.shouldStop:125 break126 test(orig)127 finally:128 self.has_run = True129 try:130 self.tearDown()131 except KeyboardInterrupt:132 raise133 except:134 self.error_context = 'teardown'135 result.addError(self, self._exc_info())136 def setUp(self):137 if not self:138 return139 if self.was_setup:140 return141 context = self.context142 if context is None:143 return144 factory = self.factory145 if factory:146 ancestors = factory.context.get(self, [])[:]147 while ancestors:148 ancestor = ancestors.pop()149 if ancestor in factory.was_setup:150 continue151 self.setupContext(ancestor)152 if not context in factory.was_setup:153 self.setupContext(context)154 else:155 self.setupContext(context)156 self.was_setup = True157 def setupContext(self, context):158 if self.factory:159 if context in self.factory.was_setup:160 return161 self.factory.was_setup[context] = self162 if isclass(context):163 names = self.classSetup164 else:165 names = self.moduleSetup166 if hasattr(context, '__path__'):167 names = self.packageSetup + names168 try_run(context, names)169 def tearDown(self):170 if not self.was_setup or self.was_torndown:171 return172 self.was_torndown = True173 context = self.context174 if context is None:175 return176 factory = self.factory177 if factory:178 ancestors = factory.context.get(self, []) + [context]179 for ancestor in ancestors:180 if not ancestor in factory.was_setup:181 continue182 if ancestor in factory.was_torndown:183 continue184 setup = factory.was_setup[ancestor]185 if setup is self:186 self.teardownContext(ancestor)187 else:188 self.teardownContext(context)189 190 def teardownContext(self, context):191 if self.factory:192 if context in self.factory.was_torndown:193 return194 self.factory.was_torndown[context] = self195 if isclass(context):196 names = self.classTeardown197 else:198 names = self.moduleTeardown199 if hasattr(context, '__path__'):200 names = self.packageTeardown + names201 try_run(context, names)202 def _get_wrapped_tests(self):203 for test in self._get_tests():204 if isinstance(test, Test) or isinstance(test, unittest.TestSuite):205 yield test206 else:207 yield Test(test,208 config=self.config)209 _tests = property(_get_wrapped_tests, LazySuite._set_tests, None,210 "Access the tests in this suite. Tests are returned "211 "inside of a context wrapper.")212class ContextSuiteFactory(object):213 suiteClass = ContextSuite214 def __init__(self, config=None):215 if config is None:216 config = Config()217 self.config = config218 self.suites = {}219 self.context = {}220 self.was_setup = {}221 self.was_torndown = {}222 def __call__(self, tests, **kw):223 """Return 'ContextSuite' for tests.224 """225 context = kw.pop('context', getattr(tests, 'context', None))226 if context is None:227 tests = self.wrapTests(tests)228 context = self.findContext(tests)229 return self.makeSuite(tests, context, **kw)230 231 def ancestry(self, context):232 """Return the ancestry of the context233 """234 if context is None:235 return236 if hasattr(context, 'im_class'):237 context = context.im_class238 if hasattr(context, '__module__'):239 ancestors = context.__module__.split('.')240 elif hasattr(context, '__name__'):241 ancestors = context.__name__.split('.')[:-1]242 else:243 raise TypeError("%s has no ancestors?" % context)244 while ancestors:245 yield resolve_name('.'.join(ancestors))246 ancestors.pop()247 def findContext(self, tests):248 if hasattr(tests, '__call__') or isinstance(tests, unittest.TestSuite):249 return None250 context = None251 for test in tests:252 # Don't look at suites for contexts, only tests253 ctx = getattr(test, 'context', None)254 if ctx is None:255 continue256 if context is None:257 context = ctx258 return context259 def makeSuite(self, tests, context, **kw):260 suite = self.suiteClass(261 tests, context=context, config=self.config, factory=self, **kw)262 if context is not None:263 self.suites.setdefault(context, []).append(suite)264 self.context.setdefault(suite, []).append(context)265 for ancestor in self.ancestry(context):266 self.suites.setdefault(ancestor, []).append(suite)267 self.context[suite].append(ancestor)268 return suite269 def wrapTests(self, tests):270 if hasattr(tests, '__call__') or isinstance(tests, unittest.TestSuite):271 return tests272 wrapped = []273 for test in tests:274 if isinstance(test, Test) or isinstance(test, unittest.TestSuite):275 wrapped.append(test)276 elif isinstance(test, ContextList):277 wrapped.append(self.makeSuite(test, context=test.context))278 else:279 wrapped.append(280 Test(test, config=self.config)281 )282 return wrapped283class ContextList(object):284 """a group of tests in a context.285 """286 def __init__(self, tests, context=None):287 self.tests = tests288 self.context = context289 def __iter__(self):...
attr.py
Source:attr.py
...28 self.tra['n'] = self.n29 def tearDown(self):30 print("tearDown")31 self.class_tda['tearDown'] = 'teardown'32 def classTearDown(self):33 print("class tearDown")34 self.class_tda['classTearDown'] = 'class teardown'35class not_shared(object):36 __test__ = True37 minipyt_shared = False38 n = 039 def __init__(self):40 print("init works")41 def classSetUp(self):42 print("classSetUp")43 self.class_tda['classSetUp'] = 'class setup'44 self.class_tra['classSetUp'] = 'class setup'45 def setUp(self):46 print("setUp")47 self.class_tda['setUp'] = 'setup'48 self.class_tra['setUp'] = 'setup'49 def test_1(self):50 print("test_1")51 self.tda['test'] = 'test_1'52 self.tra['test'] = 'test_1'53 self.n = self.n + 154 self.tra['n'] = self.n55 def test_2(self):56 print("test_2")57 self.tda['test'] = 'test_2'58 self.tra['test'] = 'test_2'59 self.n = self.n + 160 self.tra['n'] = self.n61 def tearDown(self):62 print("tearDown")63 self.class_tda['tearDown'] = 'teardown'64 def classTearDown(self):65 print("class tearDown")66 # class teardown cannot set attributes on class if the class is not67 # shared68class class_setup_exception(object):69 __test__ = True70 minipyt_shared = True71 def __init__(self):72 print("init class_setup_exception")73 def classSetUp(self):74 print("classSetUp")75 self.class_tda['classSetUp'] = 'class setup'76 raise Exception('ARF')77 def setUp(self):78 print("setUp")79 self.class_tda['setUp'] = 'setup'80 def test_1(self):81 print("test_1")82 self.tda['test'] = 'test_1'83 self.tra['test'] = 'test_1'84 def test_2(self):85 print("test_2")86 self.tda['test'] = 'test_2'87 self.tra['test'] = 'test_2'88 def tearDown(self):89 print("tearDown")90 self.class_tda['tearDown'] = 'teardown'91 def classTearDown(self):92 print("class tearDown")93 self.class_tda['classTearDown'] = 'class teardown'94class setup_exception(object):95 __test__ = True96 minipyt_shared = True97 def __init__(self):98 print("init setup_exception")99 def classSetUp(self):100 print("classSetUp")101 self.class_tda['classSetUp'] = 'class setup'102 def setUp(self):103 print("setUp")104 self.class_tda['setUp'] = 'setup'105 raise Exception('ARF')106 def test_1(self):107 print("test_1")108 self.tda['test'] = 'test_1'109 self.tra['test'] = 'test_1'110 def test_2(self):111 print("test_2")112 self.tda['test2'] = 'test_2'113 self.tra['test2'] = 'test_2'114 def tearDown(self):115 print("tearDown")116 self.class_tda['tearDown'] = 'teardown'117 def classTearDown(self):118 print("class tearDown")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!