Best JavaScript code snippet using pact-foundation-pact
test_runner.py
Source:test_runner.py
...24 # adding empty suites to the end exposes potential bugs25 suite.addTest(unittest.TestSuite())26 realSuite.addTest(unittest.TestSuite())27 return runner.run(realSuite)28def cleanup(ordering, blowUp=False):29 if not blowUp:30 ordering.append('cleanup_good')31 else:32 ordering.append('cleanup_exc')33 raise Exception('CleanUpExc')34class TestCleanUp(unittest.TestCase):35 def testCleanUp(self):36 class TestableTest(unittest.TestCase):37 def testNothing(self):38 pass39 test = TestableTest('testNothing')40 self.assertEqual(test._cleanups, [])41 cleanups = []42 def cleanup1(*args, **kwargs):43 cleanups.append((1, args, kwargs))44 def cleanup2(*args, **kwargs):45 cleanups.append((2, args, kwargs))46 test.addCleanup(cleanup1, 1, 2, 3, four='hello', five='goodbye')47 test.addCleanup(cleanup2)48 self.assertEqual(test._cleanups,49 [(cleanup1, (1, 2, 3), dict(four='hello', five='goodbye')),50 (cleanup2, (), {})])51 self.assertTrue(test.doCleanups())52 self.assertEqual(cleanups, [(2, (), {}), (1, (1, 2, 3), dict(four='hello', five='goodbye'))])53 def testCleanUpWithErrors(self):54 class TestableTest(unittest.TestCase):55 def testNothing(self):56 pass57 test = TestableTest('testNothing')58 outcome = test._outcome = _Outcome()59 CleanUpExc = Exception('foo')60 exc2 = Exception('bar')61 def cleanup1():62 raise CleanUpExc63 def cleanup2():64 raise exc265 test.addCleanup(cleanup1)66 test.addCleanup(cleanup2)67 self.assertFalse(test.doCleanups())68 self.assertFalse(outcome.success)69 ((_, (Type1, instance1, _)),70 (_, (Type2, instance2, _))) = reversed(outcome.errors)71 self.assertEqual((Type1, instance1), (Exception, CleanUpExc))72 self.assertEqual((Type2, instance2), (Exception, exc2))73 def testCleanupInRun(self):74 blowUp = False75 ordering = []76 class TestableTest(unittest.TestCase):77 def setUp(self):78 ordering.append('setUp')79 if blowUp:80 raise Exception('foo')81 def testNothing(self):82 ordering.append('test')83 def tearDown(self):84 ordering.append('tearDown')85 test = TestableTest('testNothing')86 def cleanup1():87 ordering.append('cleanup1')88 def cleanup2():89 ordering.append('cleanup2')90 test.addCleanup(cleanup1)91 test.addCleanup(cleanup2)92 def success(some_test):93 self.assertEqual(some_test, test)94 ordering.append('success')95 result = unittest.TestResult()96 result.addSuccess = success97 test.run(result)98 self.assertEqual(ordering, ['setUp', 'test', 'tearDown',99 'cleanup2', 'cleanup1', 'success'])100 blowUp = True101 ordering = []102 test = TestableTest('testNothing')103 test.addCleanup(cleanup1)104 test.run(result)105 self.assertEqual(ordering, ['setUp', 'cleanup1'])106 def testTestCaseDebugExecutesCleanups(self):107 ordering = []108 class TestableTest(unittest.TestCase):109 def setUp(self):110 ordering.append('setUp')111 self.addCleanup(cleanup1)112 def testNothing(self):113 ordering.append('test')114 def tearDown(self):115 ordering.append('tearDown')116 test = TestableTest('testNothing')117 def cleanup1():118 ordering.append('cleanup1')119 test.addCleanup(cleanup2)120 def cleanup2():121 ordering.append('cleanup2')122 test.debug()123 self.assertEqual(ordering, ['setUp', 'test', 'tearDown', 'cleanup1', 'cleanup2'])124class TestClassCleanup(unittest.TestCase):125 def test_addClassCleanUp(self):126 class TestableTest(unittest.TestCase):127 def testNothing(self):128 pass129 test = TestableTest('testNothing')130 self.assertEqual(test._class_cleanups, [])131 class_cleanups = []132 def class_cleanup1(*args, **kwargs):133 class_cleanups.append((3, args, kwargs))134 def class_cleanup2(*args, **kwargs):135 class_cleanups.append((4, args, kwargs))136 TestableTest.addClassCleanup(class_cleanup1, 1, 2, 3,137 four='hello', five='goodbye')138 TestableTest.addClassCleanup(class_cleanup2)139 self.assertEqual(test._class_cleanups,140 [(class_cleanup1, (1, 2, 3),141 dict(four='hello', five='goodbye')),142 (class_cleanup2, (), {})])143 TestableTest.doClassCleanups()144 self.assertEqual(class_cleanups, [(4, (), {}), (3, (1, 2, 3),145 dict(four='hello', five='goodbye'))])146 def test_run_class_cleanUp(self):147 ordering = []148 blowUp = True149 class TestableTest(unittest.TestCase):150 @classmethod151 def setUpClass(cls):152 ordering.append('setUpClass')153 cls.addClassCleanup(cleanup, ordering)154 if blowUp:155 raise Exception()156 def testNothing(self):157 ordering.append('test')158 @classmethod159 def tearDownClass(cls):160 ordering.append('tearDownClass')161 runTests(TestableTest)162 self.assertEqual(ordering, ['setUpClass', 'cleanup_good'])163 ordering = []164 blowUp = False165 runTests(TestableTest)166 self.assertEqual(ordering,167 ['setUpClass', 'test', 'tearDownClass', 'cleanup_good'])168 def test_debug_executes_classCleanUp(self):169 ordering = []170 class TestableTest(unittest.TestCase):171 @classmethod172 def setUpClass(cls):173 ordering.append('setUpClass')174 cls.addClassCleanup(cleanup, ordering)175 def testNothing(self):176 ordering.append('test')177 @classmethod178 def tearDownClass(cls):179 ordering.append('tearDownClass')180 suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestableTest)181 suite.debug()182 self.assertEqual(ordering,183 ['setUpClass', 'test', 'tearDownClass', 'cleanup_good'])184 def test_doClassCleanups_with_errors_addClassCleanUp(self):185 class TestableTest(unittest.TestCase):186 def testNothing(self):187 pass188 def cleanup1():189 raise Exception('cleanup1')190 def cleanup2():191 raise Exception('cleanup2')192 TestableTest.addClassCleanup(cleanup1)193 TestableTest.addClassCleanup(cleanup2)194 with self.assertRaises(Exception) as e:195 TestableTest.doClassCleanups()196 self.assertEqual(e, 'cleanup1')197 def test_with_errors_addCleanUp(self):198 ordering = []199 class TestableTest(unittest.TestCase):200 @classmethod201 def setUpClass(cls):202 ordering.append('setUpClass')203 cls.addClassCleanup(cleanup, ordering)204 def setUp(self):205 ordering.append('setUp')206 self.addCleanup(cleanup, ordering, blowUp=True)207 def testNothing(self):208 pass209 @classmethod210 def tearDownClass(cls):211 ordering.append('tearDownClass')212 result = runTests(TestableTest)213 self.assertEqual(result.errors[0][1].splitlines()[-1],214 'Exception: CleanUpExc')215 self.assertEqual(ordering,216 ['setUpClass', 'setUp', 'cleanup_exc',217 'tearDownClass', 'cleanup_good'])218 def test_run_with_errors_addClassCleanUp(self):219 ordering = []220 class TestableTest(unittest.TestCase):221 @classmethod222 def setUpClass(cls):223 ordering.append('setUpClass')224 cls.addClassCleanup(cleanup, ordering, blowUp=True)225 def setUp(self):226 ordering.append('setUp')227 self.addCleanup(cleanup, ordering)228 def testNothing(self):229 ordering.append('test')230 @classmethod231 def tearDownClass(cls):232 ordering.append('tearDownClass')233 result = runTests(TestableTest)234 self.assertEqual(result.errors[0][1].splitlines()[-1],235 'Exception: CleanUpExc')236 self.assertEqual(ordering,237 ['setUpClass', 'setUp', 'test', 'cleanup_good',238 'tearDownClass', 'cleanup_exc'])239 def test_with_errors_in_addClassCleanup_and_setUps(self):240 ordering = []241 class_blow_up = False242 method_blow_up = False243 class TestableTest(unittest.TestCase):244 @classmethod245 def setUpClass(cls):246 ordering.append('setUpClass')247 cls.addClassCleanup(cleanup, ordering, blowUp=True)248 if class_blow_up:249 raise Exception('ClassExc')250 def setUp(self):251 ordering.append('setUp')252 if method_blow_up:253 raise Exception('MethodExc')254 def testNothing(self):255 ordering.append('test')256 @classmethod257 def tearDownClass(cls):258 ordering.append('tearDownClass')259 result = runTests(TestableTest)260 self.assertEqual(result.errors[0][1].splitlines()[-1],261 'Exception: CleanUpExc')262 self.assertEqual(ordering,263 ['setUpClass', 'setUp', 'test',264 'tearDownClass', 'cleanup_exc'])265 ordering = []266 class_blow_up = True267 method_blow_up = False268 result = runTests(TestableTest)269 self.assertEqual(result.errors[0][1].splitlines()[-1],270 'Exception: ClassExc')271 self.assertEqual(result.errors[1][1].splitlines()[-1],272 'Exception: CleanUpExc')273 self.assertEqual(ordering,274 ['setUpClass', 'cleanup_exc'])275 ordering = []276 class_blow_up = False277 method_blow_up = True278 result = runTests(TestableTest)279 self.assertEqual(result.errors[0][1].splitlines()[-1],280 'Exception: MethodExc')281 self.assertEqual(result.errors[1][1].splitlines()[-1],282 'Exception: CleanUpExc')283 self.assertEqual(ordering,284 ['setUpClass', 'setUp', 'tearDownClass',285 'cleanup_exc'])286class TestModuleCleanUp(unittest.TestCase):287 def test_add_and_do_ModuleCleanup(self):288 module_cleanups = []289 def module_cleanup1(*args, **kwargs):290 module_cleanups.append((3, args, kwargs))291 def module_cleanup2(*args, **kwargs):292 module_cleanups.append((4, args, kwargs))293 class Module(object):294 unittest.addModuleCleanup(module_cleanup1, 1, 2, 3,295 four='hello', five='goodbye')296 unittest.addModuleCleanup(module_cleanup2)297 self.assertEqual(unittest.case._module_cleanups,298 [(module_cleanup1, (1, 2, 3),299 dict(four='hello', five='goodbye')),300 (module_cleanup2, (), {})])301 unittest.case.doModuleCleanups()302 self.assertEqual(module_cleanups, [(4, (), {}), (3, (1, 2, 3),303 dict(four='hello', five='goodbye'))])304 self.assertEqual(unittest.case._module_cleanups, [])305 def test_doModuleCleanup_with_errors_in_addModuleCleanup(self):306 module_cleanups = []307 def module_cleanup_good(*args, **kwargs):308 module_cleanups.append((3, args, kwargs))309 def module_cleanup_bad(*args, **kwargs):310 raise Exception('CleanUpExc')311 class Module(object):312 unittest.addModuleCleanup(module_cleanup_good, 1, 2, 3,313 four='hello', five='goodbye')314 unittest.addModuleCleanup(module_cleanup_bad)315 self.assertEqual(unittest.case._module_cleanups,316 [(module_cleanup_good, (1, 2, 3),317 dict(four='hello', five='goodbye')),318 (module_cleanup_bad, (), {})])319 with self.assertRaises(Exception) as e:320 unittest.case.doModuleCleanups()321 self.assertEqual(str(e.exception), 'CleanUpExc')322 self.assertEqual(unittest.case._module_cleanups, [])323 def test_addModuleCleanup_arg_errors(self):324 cleanups = []325 def cleanup(*args, **kwargs):326 cleanups.append((args, kwargs))327 class Module(object):328 unittest.addModuleCleanup(cleanup, 1, 2, function='hello')329 with self.assertRaises(TypeError):330 unittest.addModuleCleanup(function=cleanup, arg='hello')331 with self.assertRaises(TypeError):332 unittest.addModuleCleanup()333 unittest.case.doModuleCleanups()334 self.assertEqual(cleanups,335 [((1, 2), {'function': 'hello'})])336 def test_run_module_cleanUp(self):337 blowUp = True338 ordering = []339 class Module(object):340 @staticmethod341 def setUpModule():342 ordering.append('setUpModule')343 unittest.addModuleCleanup(cleanup, ordering)344 if blowUp:345 raise Exception('setUpModule Exc')346 @staticmethod347 def tearDownModule():348 ordering.append('tearDownModule')349 class TestableTest(unittest.TestCase):350 @classmethod351 def setUpClass(cls):352 ordering.append('setUpClass')353 def testNothing(self):354 ordering.append('test')355 @classmethod356 def tearDownClass(cls):357 ordering.append('tearDownClass')358 TestableTest.__module__ = 'Module'359 sys.modules['Module'] = Module360 result = runTests(TestableTest)361 self.assertEqual(ordering, ['setUpModule', 'cleanup_good'])362 self.assertEqual(result.errors[0][1].splitlines()[-1],363 'Exception: setUpModule Exc')364 ordering = []365 blowUp = False366 runTests(TestableTest)367 self.assertEqual(ordering,368 ['setUpModule', 'setUpClass', 'test', 'tearDownClass',369 'tearDownModule', 'cleanup_good'])370 self.assertEqual(unittest.case._module_cleanups, [])371 def test_run_multiple_module_cleanUp(self):372 blowUp = True373 blowUp2 = False374 ordering = []375 class Module1(object):376 @staticmethod377 def setUpModule():378 ordering.append('setUpModule')379 unittest.addModuleCleanup(cleanup, ordering)380 if blowUp:381 raise Exception()382 @staticmethod383 def tearDownModule():384 ordering.append('tearDownModule')385 class Module2(object):386 @staticmethod387 def setUpModule():388 ordering.append('setUpModule2')389 unittest.addModuleCleanup(cleanup, ordering)390 if blowUp2:391 raise Exception()392 @staticmethod393 def tearDownModule():394 ordering.append('tearDownModule2')395 class TestableTest(unittest.TestCase):396 @classmethod397 def setUpClass(cls):398 ordering.append('setUpClass')399 def testNothing(self):400 ordering.append('test')401 @classmethod402 def tearDownClass(cls):403 ordering.append('tearDownClass')404 class TestableTest2(unittest.TestCase):405 @classmethod406 def setUpClass(cls):407 ordering.append('setUpClass2')408 def testNothing(self):409 ordering.append('test2')410 @classmethod411 def tearDownClass(cls):412 ordering.append('tearDownClass2')413 TestableTest.__module__ = 'Module1'414 sys.modules['Module1'] = Module1415 TestableTest2.__module__ = 'Module2'416 sys.modules['Module2'] = Module2417 runTests(TestableTest, TestableTest2)418 self.assertEqual(ordering, ['setUpModule', 'cleanup_good',419 'setUpModule2', 'setUpClass2', 'test2',420 'tearDownClass2', 'tearDownModule2',421 'cleanup_good'])422 ordering = []423 blowUp = False424 blowUp2 = True425 runTests(TestableTest, TestableTest2)426 self.assertEqual(ordering, ['setUpModule', 'setUpClass', 'test',427 'tearDownClass', 'tearDownModule',428 'cleanup_good', 'setUpModule2',429 'cleanup_good'])430 ordering = []431 blowUp = False432 blowUp2 = False433 runTests(TestableTest, TestableTest2)434 self.assertEqual(ordering,435 ['setUpModule', 'setUpClass', 'test', 'tearDownClass',436 'tearDownModule', 'cleanup_good', 'setUpModule2',437 'setUpClass2', 'test2', 'tearDownClass2',438 'tearDownModule2', 'cleanup_good'])439 self.assertEqual(unittest.case._module_cleanups, [])440 def test_debug_module_executes_cleanUp(self):441 ordering = []442 class Module(object):443 @staticmethod444 def setUpModule():445 ordering.append('setUpModule')446 unittest.addModuleCleanup(cleanup, ordering)447 @staticmethod448 def tearDownModule():449 ordering.append('tearDownModule')450 class TestableTest(unittest.TestCase):451 @classmethod452 def setUpClass(cls):453 ordering.append('setUpClass')454 def testNothing(self):455 ordering.append('test')456 @classmethod457 def tearDownClass(cls):458 ordering.append('tearDownClass')459 TestableTest.__module__ = 'Module'460 sys.modules['Module'] = Module461 suite = unittest.defaultTestLoader.loadTestsFromTestCase(TestableTest)462 suite.debug()463 self.assertEqual(ordering,464 ['setUpModule', 'setUpClass', 'test', 'tearDownClass',465 'tearDownModule', 'cleanup_good'])466 self.assertEqual(unittest.case._module_cleanups, [])467 def test_addClassCleanup_arg_errors(self):468 cleanups = []469 def cleanup(*args, **kwargs):470 cleanups.append((args, kwargs))471 class TestableTest(unittest.TestCase):472 @classmethod473 def setUpClass(cls):474 cls.addClassCleanup(cleanup, 1, 2, function=3, cls=4)475 with self.assertRaises(TypeError):476 cls.addClassCleanup(function=cleanup, arg='hello')477 def testNothing(self):478 pass479 with self.assertRaises(TypeError):480 TestableTest.addClassCleanup()481 with self.assertRaises(TypeError):482 unittest.TestCase.addCleanup(cls=TestableTest(), function=cleanup)483 runTests(TestableTest)484 self.assertEqual(cleanups,485 [((1, 2), {'function': 3, 'cls': 4})])486 def test_addCleanup_arg_errors(self):487 cleanups = []488 def cleanup(*args, **kwargs):489 cleanups.append((args, kwargs))490 class TestableTest(unittest.TestCase):491 def setUp(self2):492 self2.addCleanup(cleanup, 1, 2, function=3, self=4)493 with self.assertRaises(TypeError):494 self2.addCleanup(function=cleanup, arg='hello')495 def testNothing(self):496 pass497 with self.assertRaises(TypeError):498 TestableTest().addCleanup()499 with self.assertRaises(TypeError):500 unittest.TestCase.addCleanup(self=TestableTest(), function=cleanup)501 runTests(TestableTest)502 self.assertEqual(cleanups,503 [((1, 2), {'function': 3, 'self': 4})])504 def test_with_errors_in_addClassCleanup(self):505 ordering = []506 class Module(object):507 @staticmethod508 def setUpModule():509 ordering.append('setUpModule')510 unittest.addModuleCleanup(cleanup, ordering)511 @staticmethod512 def tearDownModule():513 ordering.append('tearDownModule')514 class TestableTest(unittest.TestCase):515 @classmethod516 def setUpClass(cls):517 ordering.append('setUpClass')518 cls.addClassCleanup(cleanup, ordering, blowUp=True)519 def testNothing(self):520 ordering.append('test')521 @classmethod522 def tearDownClass(cls):523 ordering.append('tearDownClass')524 TestableTest.__module__ = 'Module'525 sys.modules['Module'] = Module526 result = runTests(TestableTest)527 self.assertEqual(result.errors[0][1].splitlines()[-1],528 'Exception: CleanUpExc')529 self.assertEqual(ordering,530 ['setUpModule', 'setUpClass', 'test', 'tearDownClass',531 'cleanup_exc', 'tearDownModule', 'cleanup_good'])532 def test_with_errors_in_addCleanup(self):533 ordering = []534 class Module(object):535 @staticmethod536 def setUpModule():537 ordering.append('setUpModule')538 unittest.addModuleCleanup(cleanup, ordering)539 @staticmethod540 def tearDownModule():541 ordering.append('tearDownModule')542 class TestableTest(unittest.TestCase):543 def setUp(self):544 ordering.append('setUp')545 self.addCleanup(cleanup, ordering, blowUp=True)546 def testNothing(self):547 ordering.append('test')548 def tearDown(self):549 ordering.append('tearDown')550 TestableTest.__module__ = 'Module'551 sys.modules['Module'] = Module552 result = runTests(TestableTest)553 self.assertEqual(result.errors[0][1].splitlines()[-1],554 'Exception: CleanUpExc')555 self.assertEqual(ordering,556 ['setUpModule', 'setUp', 'test', 'tearDown',557 'cleanup_exc', 'tearDownModule', 'cleanup_good'])558 def test_with_errors_in_addModuleCleanup_and_setUps(self):559 ordering = []560 module_blow_up = False561 class_blow_up = False562 method_blow_up = False563 class Module(object):564 @staticmethod565 def setUpModule():566 ordering.append('setUpModule')567 unittest.addModuleCleanup(cleanup, ordering, blowUp=True)568 if module_blow_up:569 raise Exception('ModuleExc')570 @staticmethod571 def tearDownModule():572 ordering.append('tearDownModule')573 class TestableTest(unittest.TestCase):574 @classmethod575 def setUpClass(cls):576 ordering.append('setUpClass')577 if class_blow_up:578 raise Exception('ClassExc')579 def setUp(self):580 ordering.append('setUp')581 if method_blow_up:582 raise Exception('MethodExc')583 def testNothing(self):584 ordering.append('test')585 @classmethod586 def tearDownClass(cls):587 ordering.append('tearDownClass')588 TestableTest.__module__ = 'Module'589 sys.modules['Module'] = Module590 result = runTests(TestableTest)591 self.assertEqual(result.errors[0][1].splitlines()[-1],592 'Exception: CleanUpExc')593 self.assertEqual(ordering,594 ['setUpModule', 'setUpClass', 'setUp', 'test',595 'tearDownClass', 'tearDownModule',596 'cleanup_exc'])597 ordering = []598 module_blow_up = True599 class_blow_up = False600 method_blow_up = False601 result = runTests(TestableTest)602 self.assertEqual(result.errors[0][1].splitlines()[-1],603 'Exception: CleanUpExc')604 self.assertEqual(result.errors[1][1].splitlines()[-1],605 'Exception: ModuleExc')606 self.assertEqual(ordering, ['setUpModule', 'cleanup_exc'])607 ordering = []608 module_blow_up = False609 class_blow_up = True610 method_blow_up = False611 result = runTests(TestableTest)612 self.assertEqual(result.errors[0][1].splitlines()[-1],613 'Exception: ClassExc')614 self.assertEqual(result.errors[1][1].splitlines()[-1],615 'Exception: CleanUpExc')616 self.assertEqual(ordering, ['setUpModule', 'setUpClass',617 'tearDownModule', 'cleanup_exc'])618 ordering = []619 module_blow_up = False620 class_blow_up = False621 method_blow_up = True622 result = runTests(TestableTest)623 self.assertEqual(result.errors[0][1].splitlines()[-1],624 'Exception: MethodExc')625 self.assertEqual(result.errors[1][1].splitlines()[-1],626 'Exception: CleanUpExc')627 self.assertEqual(ordering, ['setUpModule', 'setUpClass', 'setUp',628 'tearDownClass', 'tearDownModule',629 'cleanup_exc'])630 def test_module_cleanUp_with_multiple_classes(self):631 ordering =[]632 def cleanup1():633 ordering.append('cleanup1')634 def cleanup2():635 ordering.append('cleanup2')636 def cleanup3():637 ordering.append('cleanup3')638 class Module(object):639 @staticmethod640 def setUpModule():641 ordering.append('setUpModule')642 unittest.addModuleCleanup(cleanup1)643 @staticmethod644 def tearDownModule():645 ordering.append('tearDownModule')646 class TestableTest(unittest.TestCase):647 def setUp(self):648 ordering.append('setUp')649 self.addCleanup(cleanup2)650 def testNothing(self):651 ordering.append('test')652 def tearDown(self):653 ordering.append('tearDown')654 class OtherTestableTest(unittest.TestCase):655 def setUp(self):656 ordering.append('setUp2')657 self.addCleanup(cleanup3)658 def testNothing(self):659 ordering.append('test2')660 def tearDown(self):661 ordering.append('tearDown2')662 TestableTest.__module__ = 'Module'663 OtherTestableTest.__module__ = 'Module'664 sys.modules['Module'] = Module665 runTests(TestableTest, OtherTestableTest)666 self.assertEqual(ordering,667 ['setUpModule', 'setUp', 'test', 'tearDown',668 'cleanup2', 'setUp2', 'test2', 'tearDown2',669 'cleanup3', 'tearDownModule', 'cleanup1'])670class Test_TextTestRunner(unittest.TestCase):671 """Tests for TextTestRunner."""672 def setUp(self):673 # clean the environment from pre-existing PYTHONWARNINGS to make674 # test_warnings results consistent675 self.pythonwarnings = os.environ.get('PYTHONWARNINGS')676 if self.pythonwarnings:677 del os.environ['PYTHONWARNINGS']678 def tearDown(self):679 # bring back pre-existing PYTHONWARNINGS if present680 if self.pythonwarnings:681 os.environ['PYTHONWARNINGS'] = self.pythonwarnings682 def test_init(self):683 runner = unittest.TextTestRunner()684 self.assertFalse(runner.failfast)685 self.assertFalse(runner.buffer)686 self.assertEqual(runner.verbosity, 1)687 self.assertEqual(runner.warnings, None)688 self.assertTrue(runner.descriptions)689 self.assertEqual(runner.resultclass, unittest.TextTestResult)690 self.assertFalse(runner.tb_locals)691 def test_multiple_inheritance(self):692 class AResult(unittest.TestResult):693 def __init__(self, stream, descriptions, verbosity):694 super(AResult, self).__init__(stream, descriptions, verbosity)695 class ATextResult(unittest.TextTestResult, AResult):696 pass697 # This used to raise an exception due to TextTestResult not passing698 # on arguments in its __init__ super call699 ATextResult(None, None, 1)700 def testBufferAndFailfast(self):701 class Test(unittest.TestCase):702 def testFoo(self):703 pass704 result = unittest.TestResult()705 runner = unittest.TextTestRunner(stream=io.StringIO(), failfast=True,706 buffer=True)707 # Use our result object708 runner._makeResult = lambda: result709 runner.run(Test('testFoo'))710 self.assertTrue(result.failfast)711 self.assertTrue(result.buffer)712 def test_locals(self):713 runner = unittest.TextTestRunner(stream=io.StringIO(), tb_locals=True)714 result = runner.run(unittest.TestSuite())715 self.assertEqual(True, result.tb_locals)716 def testRunnerRegistersResult(self):717 class Test(unittest.TestCase):718 def testFoo(self):719 pass720 originalRegisterResult = unittest.runner.registerResult721 def cleanup():722 unittest.runner.registerResult = originalRegisterResult723 self.addCleanup(cleanup)724 result = unittest.TestResult()725 runner = unittest.TextTestRunner(stream=io.StringIO())726 # Use our result object727 runner._makeResult = lambda: result728 self.wasRegistered = 0729 def fakeRegisterResult(thisResult):730 self.wasRegistered += 1731 self.assertEqual(thisResult, result)732 unittest.runner.registerResult = fakeRegisterResult733 runner.run(unittest.TestSuite())734 self.assertEqual(self.wasRegistered, 1)735 def test_works_with_result_without_startTestRun_stopTestRun(self):...
test_lru_cache.py
Source:test_lru_cache.py
...73 self.assertEqual('biz', cache['baz'])74 # This must kick out 'foo' because it was the last accessed75 cache['nub'] = 'in'76 self.assertFalse('foo' in cache)77 def test_cleanup(self):78 """Test that we can use a cleanup function."""79 cleanup_called = []80 def cleanup_func(key, val):81 cleanup_called.append((key, val))82 cache = lru_cache.LRUCache(max_cache=2, after_cleanup_count=2)83 cache.add('baz', '1', cleanup=cleanup_func)84 cache.add('foo', '2', cleanup=cleanup_func)85 cache.add('biz', '3', cleanup=cleanup_func)86 self.assertEqual([('baz', '1')], cleanup_called)87 # 'foo' is now most recent, so final cleanup will call it last88 cache['foo']89 cache.clear()90 self.assertEqual([('baz', '1'), ('biz', '3'), ('foo', '2')],91 cleanup_called)92 def test_cleanup_on_replace(self):93 """Replacing an object should cleanup the old value."""94 cleanup_called = []95 def cleanup_func(key, val):96 cleanup_called.append((key, val))97 cache = lru_cache.LRUCache(max_cache=2)98 cache.add(1, 10, cleanup=cleanup_func)99 cache.add(2, 20, cleanup=cleanup_func)100 cache.add(2, 25, cleanup=cleanup_func)101 self.assertEqual([(2, 20)], cleanup_called)102 self.assertEqual(25, cache[2])103 # Even __setitem__ should make sure cleanup() is called104 cache[2] = 26105 self.assertEqual([(2, 20), (2, 25)], cleanup_called)106 def test_len(self):107 cache = lru_cache.LRUCache(max_cache=10, after_cleanup_count=10)108 cache[1] = 10109 cache[2] = 20110 cache[3] = 30111 cache[4] = 40112 self.assertEqual(4, len(cache))113 cache[5] = 50114 cache[6] = 60115 cache[7] = 70116 cache[8] = 80117 self.assertEqual(8, len(cache))118 cache[1] = 15 # replacement119 self.assertEqual(8, len(cache))120 cache[9] = 90121 cache[10] = 100122 cache[11] = 110123 # We hit the max124 self.assertEqual(10, len(cache))125 self.assertEqual([11, 10, 9, 1, 8, 7, 6, 5, 4, 3],126 [n.key for n in cache._walk_lru()])127 def test_cleanup_shrinks_to_after_clean_count(self):128 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=3)129 cache.add(1, 10)130 cache.add(2, 20)131 cache.add(3, 25)132 cache.add(4, 30)133 cache.add(5, 35)134 self.assertEqual(5, len(cache))135 # This will bump us over the max, which causes us to shrink down to136 # after_cleanup_cache size137 cache.add(6, 40)138 self.assertEqual(3, len(cache))139 def test_after_cleanup_larger_than_max(self):140 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=10)141 self.assertEqual(5, cache._after_cleanup_count)142 def test_after_cleanup_none(self):143 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=None)144 # By default _after_cleanup_size is 80% of the normal size145 self.assertEqual(4, cache._after_cleanup_count)146 def test_cleanup_2(self):147 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=2)148 # Add these in order149 cache.add(1, 10)150 cache.add(2, 20)151 cache.add(3, 25)152 cache.add(4, 30)153 cache.add(5, 35)154 self.assertEqual(5, len(cache))155 # Force a compaction156 cache.cleanup()157 self.assertEqual(2, len(cache))158 def test_preserve_last_access_order(self):159 cache = lru_cache.LRUCache(max_cache=5)160 # Add these in order161 cache.add(1, 10)162 cache.add(2, 20)163 cache.add(3, 25)164 cache.add(4, 30)165 cache.add(5, 35)166 self.assertEqual([5, 4, 3, 2, 1], [n.key for n in cache._walk_lru()])167 # Now access some randomly168 cache[2]169 cache[5]170 cache[3]171 cache[2]172 self.assertEqual([2, 3, 5, 4, 1], [n.key for n in cache._walk_lru()])173 def test_get(self):174 cache = lru_cache.LRUCache(max_cache=5)175 cache.add(1, 10)176 cache.add(2, 20)177 self.assertEqual(20, cache.get(2))178 self.assertEqual(None, cache.get(3))179 obj = object()180 self.assertTrue(obj is cache.get(3, obj))181 self.assertEqual([2, 1], [n.key for n in cache._walk_lru()])182 self.assertEqual(10, cache.get(1))183 self.assertEqual([1, 2], [n.key for n in cache._walk_lru()])184 def test_keys(self):185 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=5)186 cache[1] = 2187 cache[2] = 3188 cache[3] = 4189 self.assertEqual([1, 2, 3], sorted(cache.keys()))190 cache[4] = 5191 cache[5] = 6192 cache[6] = 7193 self.assertEqual([2, 3, 4, 5, 6], sorted(cache.keys()))194 def test_resize_smaller(self):195 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=4)196 cache[1] = 2197 cache[2] = 3198 cache[3] = 4199 cache[4] = 5200 cache[5] = 6201 self.assertEqual([1, 2, 3, 4, 5], sorted(cache.keys()))202 cache[6] = 7203 self.assertEqual([3, 4, 5, 6], sorted(cache.keys()))204 # Now resize to something smaller, which triggers a cleanup205 cache.resize(max_cache=3, after_cleanup_count=2)206 self.assertEqual([5, 6], sorted(cache.keys()))207 # Adding something will use the new size208 cache[7] = 8209 self.assertEqual([5, 6, 7], sorted(cache.keys()))210 cache[8] = 9211 self.assertEqual([7, 8], sorted(cache.keys()))212 def test_resize_larger(self):213 cache = lru_cache.LRUCache(max_cache=5, after_cleanup_count=4)214 cache[1] = 2215 cache[2] = 3216 cache[3] = 4217 cache[4] = 5218 cache[5] = 6219 self.assertEqual([1, 2, 3, 4, 5], sorted(cache.keys()))220 cache[6] = 7221 self.assertEqual([3, 4, 5, 6], sorted(cache.keys()))222 cache.resize(max_cache=8, after_cleanup_count=6)223 self.assertEqual([3, 4, 5, 6], sorted(cache.keys()))224 cache[7] = 8225 cache[8] = 9226 cache[9] = 10227 cache[10] = 11228 self.assertEqual([3, 4, 5, 6, 7, 8, 9, 10], sorted(cache.keys()))229 cache[11] = 12 # triggers cleanup back to new after_cleanup_count230 self.assertEqual([6, 7, 8, 9, 10, 11], sorted(cache.keys()))231class TestLRUSizeCache(TestCase):232 def test_basic_init(self):233 cache = lru_cache.LRUSizeCache()234 self.assertEqual(2048, cache._max_cache)235 self.assertEqual(int(cache._max_size*0.8), cache._after_cleanup_size)236 self.assertEqual(0, cache._value_size)237 def test_add__null_key(self):238 cache = lru_cache.LRUSizeCache()239 self.assertRaises(ValueError, cache.add, lru_cache._null_key, 1)240 def test_add_tracks_size(self):241 cache = lru_cache.LRUSizeCache()242 self.assertEqual(0, cache._value_size)243 cache.add('my key', 'my value text')244 self.assertEqual(13, cache._value_size)245 def test_remove_tracks_size(self):246 cache = lru_cache.LRUSizeCache()247 self.assertEqual(0, cache._value_size)248 cache.add('my key', 'my value text')249 self.assertEqual(13, cache._value_size)250 node = cache._cache['my key']251 cache._remove_node(node)252 self.assertEqual(0, cache._value_size)253 def test_no_add_over_size(self):254 """Adding a large value may not be cached at all."""255 cache = lru_cache.LRUSizeCache(max_size=10, after_cleanup_size=5)256 self.assertEqual(0, cache._value_size)257 self.assertEqual({}, cache.items())258 cache.add('test', 'key')259 self.assertEqual(3, cache._value_size)260 self.assertEqual({'test': 'key'}, cache.items())261 cache.add('test2', 'key that is too big')262 self.assertEqual(3, cache._value_size)263 self.assertEqual({'test':'key'}, cache.items())264 # If we would add a key, only to cleanup and remove all cached entries,265 # then obviously that value should not be stored266 cache.add('test3', 'bigkey')267 self.assertEqual(3, cache._value_size)268 self.assertEqual({'test':'key'}, cache.items())269 cache.add('test4', 'bikey')270 self.assertEqual(3, cache._value_size)271 self.assertEqual({'test':'key'}, cache.items())272 def test_no_add_over_size_cleanup(self):273 """If a large value is not cached, we will call cleanup right away."""274 cleanup_calls = []275 def cleanup(key, value):276 cleanup_calls.append((key, value))277 cache = lru_cache.LRUSizeCache(max_size=10, after_cleanup_size=5)278 self.assertEqual(0, cache._value_size)279 self.assertEqual({}, cache.items())280 cache.add('test', 'key that is too big', cleanup=cleanup)281 # key was not added282 self.assertEqual(0, cache._value_size)283 self.assertEqual({}, cache.items())284 # and cleanup was called285 self.assertEqual([('test', 'key that is too big')], cleanup_calls)286 def test_adding_clears_cache_based_on_size(self):287 """The cache is cleared in LRU order until small enough"""288 cache = lru_cache.LRUSizeCache(max_size=20)289 cache.add('key1', 'value') # 5 chars290 cache.add('key2', 'value2') # 6 chars291 cache.add('key3', 'value23') # 7 chars292 self.assertEqual(5+6+7, cache._value_size)293 cache['key2'] # reference key2 so it gets a newer reference time294 cache.add('key4', 'value234') # 8 chars, over limit295 # We have to remove 2 keys to get back under limit296 self.assertEqual(6+8, cache._value_size)297 self.assertEqual({'key2':'value2', 'key4':'value234'},298 cache.items())299 def test_adding_clears_to_after_cleanup_size(self):300 cache = lru_cache.LRUSizeCache(max_size=20, after_cleanup_size=10)301 cache.add('key1', 'value') # 5 chars302 cache.add('key2', 'value2') # 6 chars303 cache.add('key3', 'value23') # 7 chars304 self.assertEqual(5+6+7, cache._value_size)305 cache['key2'] # reference key2 so it gets a newer reference time306 cache.add('key4', 'value234') # 8 chars, over limit307 # We have to remove 3 keys to get back under limit308 self.assertEqual(8, cache._value_size)309 self.assertEqual({'key4':'value234'}, cache.items())310 def test_custom_sizes(self):311 def size_of_list(lst):312 return sum(len(x) for x in lst)313 cache = lru_cache.LRUSizeCache(max_size=20, after_cleanup_size=10,314 compute_size=size_of_list)315 cache.add('key1', ['val', 'ue']) # 5 chars316 cache.add('key2', ['val', 'ue2']) # 6 chars317 cache.add('key3', ['val', 'ue23']) # 7 chars318 self.assertEqual(5+6+7, cache._value_size)319 cache['key2'] # reference key2 so it gets a newer reference time320 cache.add('key4', ['value', '234']) # 8 chars, over limit321 # We have to remove 3 keys to get back under limit322 self.assertEqual(8, cache._value_size)323 self.assertEqual({'key4':['value', '234']}, cache.items())324 def test_cleanup(self):325 cache = lru_cache.LRUSizeCache(max_size=20, after_cleanup_size=10)326 # Add these in order327 cache.add('key1', 'value') # 5 chars328 cache.add('key2', 'value2') # 6 chars329 cache.add('key3', 'value23') # 7 chars330 self.assertEqual(5+6+7, cache._value_size)331 cache.cleanup()332 # Only the most recent fits after cleaning up333 self.assertEqual(7, cache._value_size)334 def test_keys(self):335 cache = lru_cache.LRUSizeCache(max_size=10)336 cache[1] = 'a'337 cache[2] = 'b'338 cache[3] = 'cdef'339 self.assertEqual([1, 2, 3], sorted(cache.keys()))340 def test_resize_smaller(self):341 cache = lru_cache.LRUSizeCache(max_size=10, after_cleanup_size=9)342 cache[1] = 'abc'343 cache[2] = 'def'344 cache[3] = 'ghi'345 cache[4] = 'jkl'...
lfcmetrics.py
Source:lfcmetrics.py
1##############################################################################2#3# NAME: lfcmetrics.py4#5# FACILITY: SAM (Service Availability Monitoring)6#7# COPYRIGHT:8# Copyright (c) 2009, Members of the EGEE Collaboration.9# http://www.eu-egee.org/partners/10# Licensed under the Apache License, Version 2.0.11# http://www.apache.org/licenses/LICENSE-2.012# This software is provided "as is", without warranties13# or conditions of any kind, either express or implied.14#15# DESCRIPTION:16#17# LFC metrics.18#19# AUTHORS: Konstantin Skaburskas, CERN20#21# CREATED: 24-Aug-201022#23# NOTES:24#25# MODIFIED:26#27##############################################################################28"""29LFC metrics.30LFC metrics.31Konstantin Skaburskas <konstantin.skaburskas@cern.ch>, CERN32SAM (Service Availability Monitoring)33"""34import re35import os36import sys37import time38import signal39import commands40try:41 from gridmon import probe42 from gridmon import utils as samutils43 from threadpool import ThreadPool44 import lfc2 as lfc45except ImportError as e:46 print("UNKNOWN: Error loading modules : %s" % (e))47 sys.exit(3)48class LFCMetrics(probe.MetricGatherer):49 ns = 'org.sam'50 # Cleanup metric51 cleanup_timeout = 60*552 cleanup_file_ttl = 15*24*360053 cleanup_files_max = sys.maxint54 cleanup_nthreads = 555 cleanup_file_names = ['sam-choose-LFC',56 'sam-nag-choose',57 'sft-lcg-rm-cr',58 'sam-lcg-rm-cr',59 'SRM-put']60 def __init__(self, tuples, lfctype):61 probe.MetricGatherer.__init__(self, tuples, lfctype)62 self.cleanup_dir = '/grid/%s/SAM' % self.voName63 self.usage = """ Metrics specific parameters:64%s.LFC-Cleanup65--cleanup-timeout <sec> Cleanup timeout. (Default: %i sec)66--cleanup-dir <dir> Directory to clean. (Default: %s)67--cleanup-file-ttl <hrs> Time for a file to stay on LFC. (Default: %i hrs)68--cleanup-files-max <num> Number of files to delete at most before timeout69 kicks in. (Default: %i)70--cleanup-threads <num> Number of cleanup threads. (Default: %i)71""" % (self.ns,72 self.cleanup_timeout,73 self.cleanup_dir,74 self.cleanup_file_ttl/3600,75 self.cleanup_files_max,76 self.cleanup_nthreads)77 self.probeinfo = { 'probeName' : self.ns+'.LFC-probe',78 'probeVersion' : '0.1',79 'serviceVersion' : '>= 0.1.1'}80 self._metrics = {81 'Cleanup' : {82 'metricDescription' : "Clean test area on LFC",83 'metricLocality' : 'remote',84 'metricType' : 'status',85 'metricVersion' : '0.1',86 'cmdLineOptions' : ['cleanup-timeout=',87 'cleanup-dir=',88 'cleanup-file-ttl=',89 'cleanup-files-max=',90 'cleanup-threads=',91 ],92 'metricChildren' : []93 }94 }95 self.set_metrics(self._metrics)96 self.parse_cmd_args(tuples)97 self.make_workdir()98 self.cleanup_persist_lfns_fn = self.workdir_metric+'/lfns.db'99 def parse_args(self, opts):100 for o,v in opts:101 if o == '--cleanup-timeout':102 self.cleanup_timeout = int(v)103 elif o == '--cleanup-dir':104 self.cleanup_dir = v105 elif o == '--cleanup-file-ttl':106 self.cleanup_file_ttl = int(v)107 elif o == '--cleanup-files-max':108 self.cleanup_files_max = int(v)109 elif o == '--cleanup-threads':110 self.cleanup_nthreads = int(v)111 else:112 pass113 def metricCleanup(self):114 """Cleanup of test area on LFC.115 NOTE:116 Can't use lfc_delfilesbypattern as we need to take into account dates.117 pattern = '(%s)*' % '|'.join(self.cleanup_file_names)118 results = lfc.lfc_delfilesbypattern(self.cleanup_dir, pattern, 0)119 """120 # XXXX: lfc.S_ISDIR is not in lfc2 module. Use this magic number.121 LFC_MAGIC_FILEMODE = 33204122 status = 'OK'123 os.environ['LFC_HOST'] = self.hostName124 def persist_lfns(data):125 import pickle126 fp = open(self.cleanup_persist_lfns_fn, 'w')127 pickle.dump(data, fp)128 fp.close()129 def load_lfns():130 import pickle131 try:132 fp = open(self.cleanup_persist_lfns_fn)133 lfns = pickle.load(fp)134 fp.close()135 return lfns136 except Exception:137 return []138 def del_lfn_file(lfn):139 cmd = 'lcg-del -a %s' % lfn140 rc, out = commands.getstatusoutput(cmd)141 if rc == 0:142 return True143 else:144 if re.search('Permission denied', out, re.M):145 return False146 elif re.search('No such file or directory', out, re.M):147 return False148 else:149 'surl=$(lcg-lr lfn:$1) guid=$(lcg-lg $surl) lcg-uf $guid $surl'150 cmd = 'lcg-lr %s' % lfn151 rc, surls = commands.getstatusoutput(cmd)152 if rc == 0:153 for surl in surls.strip('\n').split('\n'):154 cmd = 'lcg-lg %s' % surl155 rc, guid = commands.getstatusoutput(cmd)156 if rc != 0:157 return False158 else:159 cmd = 'lcg-uf %s %s' % (guid, surl)160 rc, _ = commands.getstatusoutput(cmd)161 if rc != 0:162 return False163 else:164 return False165 return True166 def del_lfn_dir(dirn):167 cmd = 'lfc-rm -f -r %s' % dirn168 try:169 rc, _ = commands.getstatusoutput(cmd)170 except Exception as e:171 return False172 if rc == 0:173 return True174 else:175 return False176 T_ttl = self.cleanup_file_ttl177 dirname = self.cleanup_dir178 fnames_re = re.compile('|'.join(self.cleanup_file_names))179 dir_pt = lfc.lfc_opendir(dirname)180 if not dir_pt:181 self.prints("Couldn't open %s" % dirname)182 return 'UNKNOWN'183 global pool, tasks184 tasks = []185 pool = ThreadPool(self.cleanup_nthreads)186 def sighandler(a, b):187 global pool, tasks188 self.printd('caught signal: %s' % samutils.time_now())189 tasks = pool.dropAll()190 while 1:191 if pool.getThreadCount() == 0 and pool.getTasksCount() == 0:192 break193 time.sleep(.5)194 signal.signal(signal.SIGALRM, sighandler)195 signal.alarm(self.cleanup_timeout)196 self.printd('set timeout for %i sec' % self.cleanup_timeout)197 i_tot = 0198 i_match_name = 0199 i_match_all = 0200 global i_deleted201 i_deleted = 0202 def task_callback(val):203 global i_deleted204 i_deleted += val and 1 or 0205 t_start = time.time()206 t_stop = t_start + self.cleanup_timeout207 self.printd('initialised: %s' % samutils.time_now())208 lfns_loaded = load_lfns()209 if lfns_loaded:210 self.printd("loaded %i lfns from previous run" % len(lfns_loaded))211 i_tot += len(lfns_loaded)212 for i,lfn in enumerate(lfns_loaded[:]):213 if lfn.startswith('lfn'):214 pool.enqueueTask(del_lfn_file, lfn, task_callback)215 else:216 pool.enqueueTask(del_lfn_dir, lfn, task_callback)217 lfns_loaded[i] = 'lfn:%s' % lfn218 # first, clear 2/3 of the cache, then proceed further219 lfns_len_stop = len(lfns_loaded) / 3220 while pool.getTasksCount() > lfns_len_stop and time.time() < t_stop:221 time.sleep(2)222 while i_deleted <= self.cleanup_files_max and time.time() < t_stop:223 entry = lfc.lfc_readdirx(dir_pt)224 if not entry:225 self.printd("no more files to delete")226 break227 self.printdvm("file: %s" % entry.d_name)228 if fnames_re.match(entry.d_name):229 e_fullname = '%s/%s' % (dirname, entry.d_name)230 lfn = 'lfn:%s' % e_fullname231 if lfn in lfns_loaded:232 self.printdvm("already in local cache %s" % entry.d_name)233 continue234 if entry.mtime < time.time() - T_ttl:235 self.printdvm("match name&time: %s" % entry.d_name)236 i_match_all += 1237 if entry.filemode == LFC_MAGIC_FILEMODE:238 # assume this is a file239 pool.enqueueTask(del_lfn_file, lfn, task_callback)240 else:241 # this is a directory242 pool.enqueueTask(del_lfn_dir, e_fullname, task_callback)243 else:244 self.printdvm("match name: %s" % entry.d_name)245 i_match_name += 1246 else:247 self.printdvm("not matched: %s" % entry.d_name)248 i_tot += 1249 self.printd('finished processing: %s' % samutils.time_now())250 if len(tasks) == 0:251 tasks = pool.delTasks()252 pool.joinAll(waitForTasks=False, waitForThreads=False)253 self.printd('done with cleanup for now: time spent %i sec' % \254 (time.time() - t_start))255 if len(tasks) > 0:256 self.printd('persisting %i unprocessed lfns' % len(tasks))257 persist_lfns([x[1] for x in tasks[:]])258 else:259 try:260 os.unlink(self.cleanup_persist_lfns_fn)261 except Exception: pass262 self.printd("total %i" % i_tot)263 self.printd("match name&time %i" % i_match_all)264 self.printd("match name %i" % i_match_name)265 self.printd("deleted %i" % i_deleted)266 self.prints('total %i, match name&time %i, match name %i, deleted %i' % \267 (i_tot, i_match_all, i_match_name, i_deleted,))...
test_async_case.py
Source:test_async_case.py
...27 'asyncSetUp',28 'test',29 'asyncTearDown'])30 events.append('tearDown')31 async def on_cleanup(self):32 self.assertEqual(events, ['setUp',33 'asyncSetUp',34 'test',35 'asyncTearDown',36 'tearDown'])37 events.append('cleanup')38 test = Test("test_func")39 test.run()40 self.assertEqual(events, ['setUp',41 'asyncSetUp',42 'test',43 'asyncTearDown',44 'tearDown',45 'cleanup'])46 def test_exception_in_setup(self):47 events = []48 class Test(unittest.IsolatedAsyncioTestCase):49 async def asyncSetUp(self):50 events.append('asyncSetUp')51 raise Exception()52 async def test_func(self):53 events.append('test')54 self.addAsyncCleanup(self.on_cleanup)55 async def asyncTearDown(self):56 events.append('asyncTearDown')57 async def on_cleanup(self):58 events.append('cleanup')59 test = Test("test_func")60 test.run()61 self.assertEqual(events, ['asyncSetUp'])62 def test_exception_in_test(self):63 events = []64 class Test(unittest.IsolatedAsyncioTestCase):65 async def asyncSetUp(self):66 events.append('asyncSetUp')67 async def test_func(self):68 events.append('test')69 raise Exception()70 self.addAsyncCleanup(self.on_cleanup)71 async def asyncTearDown(self):72 events.append('asyncTearDown')73 async def on_cleanup(self):74 events.append('cleanup')75 test = Test("test_func")76 test.run()77 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown'])78 def test_exception_in_test_after_adding_cleanup(self):79 events = []80 class Test(unittest.IsolatedAsyncioTestCase):81 async def asyncSetUp(self):82 events.append('asyncSetUp')83 async def test_func(self):84 events.append('test')85 self.addAsyncCleanup(self.on_cleanup)86 raise Exception()87 async def asyncTearDown(self):88 events.append('asyncTearDown')89 async def on_cleanup(self):90 events.append('cleanup')91 test = Test("test_func")92 test.run()93 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])94 def test_exception_in_tear_down(self):95 events = []96 class Test(unittest.IsolatedAsyncioTestCase):97 async def asyncSetUp(self):98 events.append('asyncSetUp')99 async def test_func(self):100 events.append('test')101 self.addAsyncCleanup(self.on_cleanup)102 async def asyncTearDown(self):103 events.append('asyncTearDown')104 raise Exception()105 async def on_cleanup(self):106 events.append('cleanup')107 test = Test("test_func")108 test.run()109 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])110 def test_exception_in_tear_clean_up(self):111 events = []112 class Test(unittest.IsolatedAsyncioTestCase):113 async def asyncSetUp(self):114 events.append('asyncSetUp')115 async def test_func(self):116 events.append('test')117 self.addAsyncCleanup(self.on_cleanup)118 async def asyncTearDown(self):119 events.append('asyncTearDown')120 async def on_cleanup(self):121 events.append('cleanup')122 raise Exception()123 test = Test("test_func")124 test.run()125 self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup'])126 def test_cleanups_interleave_order(self):127 events = []128 class Test(unittest.IsolatedAsyncioTestCase):129 async def test_func(self):130 self.addAsyncCleanup(self.on_sync_cleanup, 1)131 self.addAsyncCleanup(self.on_async_cleanup, 2)132 self.addAsyncCleanup(self.on_sync_cleanup, 3)133 self.addAsyncCleanup(self.on_async_cleanup, 4)134 async def on_sync_cleanup(self, val):135 events.append(f'sync_cleanup {val}')136 async def on_async_cleanup(self, val):137 events.append(f'async_cleanup {val}')138 test = Test("test_func")139 test.run()140 self.assertEqual(events, ['async_cleanup 4',141 'sync_cleanup 3',142 'async_cleanup 2',143 'sync_cleanup 1'])144if __name__ == "__main__":...
exc_util_unittest.py
Source:exc_util_unittest.py
1# Copyright 2019 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4import re5import sys6import unittest7from py_utils import exc_util8class FakeConnectionError(Exception):9 pass10class FakeDisconnectionError(Exception):11 pass12class FakeProcessingError(Exception):13 pass14class FakeCleanupError(Exception):15 pass16class FaultyClient(object):17 def __init__(self, *args):18 self.failures = set(args)19 self.called = set()20 def Connect(self):21 self.called.add('Connect')22 if FakeConnectionError in self.failures:23 raise FakeConnectionError('Oops!')24 def Process(self):25 self.called.add('Process')26 if FakeProcessingError in self.failures:27 raise FakeProcessingError('Oops!')28 @exc_util.BestEffort29 def Disconnect(self):30 self.called.add('Disconnect')31 if FakeDisconnectionError in self.failures:32 raise FakeDisconnectionError('Oops!')33 @exc_util.BestEffort34 def Cleanup(self):35 self.called.add('Cleanup')36 if FakeCleanupError in self.failures:37 raise FakeCleanupError('Oops!')38class ReraiseTests(unittest.TestCase):39 def assertLogMatches(self, pattern):40 self.assertRegexpMatches(41 sys.stderr.getvalue(), pattern) # pylint: disable=no-member42 def assertLogNotMatches(self, pattern):43 self.assertNotRegexpMatches(44 sys.stderr.getvalue(), pattern) # pylint: disable=no-member45 def testTryRaisesExceptRaises(self):46 client = FaultyClient(FakeConnectionError, FakeDisconnectionError)47 # The connection error reaches the top level, while the disconnection48 # error is logged.49 with self.assertRaises(FakeConnectionError):50 try:51 client.Connect()52 except:53 client.Disconnect()54 raise55 self.assertLogMatches(re.compile(56 r'While handling a FakeConnectionError, .* was also raised:\n'57 r'Traceback \(most recent call last\):\n'58 r'.*\n'59 r'FakeDisconnectionError: Oops!\n', re.DOTALL))60 self.assertItemsEqual(client.called, ['Connect', 'Disconnect'])61 def testTryRaisesExceptDoesnt(self):62 client = FaultyClient(FakeConnectionError)63 # The connection error reaches the top level, disconnecting did not raise64 # an exception (so nothing is logged).65 with self.assertRaises(FakeConnectionError):66 try:67 client.Connect()68 except:69 client.Disconnect()70 raise71 self.assertLogNotMatches('FakeDisconnectionError')72 self.assertItemsEqual(client.called, ['Connect', 'Disconnect'])73 def testTryPassesNoException(self):74 client = FaultyClient(FakeDisconnectionError)75 # If there is no connection error, the except clause is not called (even if76 # it would have raised an exception).77 try:78 client.Connect()79 except:80 client.Disconnect()81 raise82 self.assertLogNotMatches('FakeConnectionError')83 self.assertLogNotMatches('FakeDisconnectionError')84 self.assertItemsEqual(client.called, ['Connect'])85 def testTryRaisesFinallyRaises(self):86 worker = FaultyClient(FakeProcessingError, FakeCleanupError)87 # The processing error reaches the top level, the cleanup error is logged.88 with self.assertRaises(FakeProcessingError):89 try:90 worker.Process()91 except:92 raise # Needed for Cleanup to know if an exception is handled.93 finally:94 worker.Cleanup()95 self.assertLogMatches(re.compile(96 r'While handling a FakeProcessingError, .* was also raised:\n'97 r'Traceback \(most recent call last\):\n'98 r'.*\n'99 r'FakeCleanupError: Oops!\n', re.DOTALL))100 self.assertItemsEqual(worker.called, ['Process', 'Cleanup'])101 def testTryRaisesFinallyDoesnt(self):102 worker = FaultyClient(FakeProcessingError)103 # The processing error reaches the top level, the cleanup code runs fine.104 with self.assertRaises(FakeProcessingError):105 try:106 worker.Process()107 except:108 raise # Needed for Cleanup to know if an exception is handled.109 finally:110 worker.Cleanup()111 self.assertLogNotMatches('FakeProcessingError')112 self.assertLogNotMatches('FakeCleanupError')113 self.assertItemsEqual(worker.called, ['Process', 'Cleanup'])114 def testTryPassesFinallyRaises(self):115 worker = FaultyClient(FakeCleanupError)116 # The processing code runs fine, the cleanup code raises an exception117 # which reaches the top level.118 with self.assertRaises(FakeCleanupError):119 try:120 worker.Process()121 except:122 raise # Needed for Cleanup to know if an exception is handled.123 finally:124 worker.Cleanup()125 self.assertLogNotMatches('FakeProcessingError')126 self.assertLogNotMatches('FakeCleanupError')127 self.assertItemsEqual(worker.called, ['Process', 'Cleanup'])128 def testTryRaisesExceptRaisesFinallyRaises(self):129 worker = FaultyClient(130 FakeProcessingError, FakeDisconnectionError, FakeCleanupError)131 # Chaining try-except-finally works fine. Only the processing error reaches132 # the top level; the other two are logged.133 with self.assertRaises(FakeProcessingError):134 try:135 worker.Process()136 except:137 worker.Disconnect()138 raise139 finally:140 worker.Cleanup()141 self.assertLogMatches('FakeDisconnectionError')142 self.assertLogMatches('FakeCleanupError')...
stress_test.py
Source:stress_test.py
1"""2This script concurrently builds and migrates instances. This can be useful when3troubleshooting race-conditions in virt-layer code.4Expects:5 novarc to be sourced in the environment6Helper Script for Xen Dom0:7 # cat /tmp/destroy_cache_vdis8 #!/bin/bash9 xe vdi-list | grep "Glance Image" -C1 | grep "^uuid" | awk '{print $5}' |10 xargs -n1 -I{} xe vdi-destroy uuid={}11"""12import argparse13import contextlib14import multiprocessing15import subprocess16import sys17import time18DOM0_CLEANUP_SCRIPT = "/tmp/destroy_cache_vdis"19def run(cmd):20 ret = subprocess.call(cmd, shell=True)21 if ret != 0:22 print >> sys.stderr, "Command exited non-zero: %s" % cmd23@contextlib.contextmanager24def server_built(server_name, image_name, flavor=1, cleanup=True):25 run("nova boot --image=%(image_name)s --flavor=%(flavor)s"26 " --poll %(server_name)s" % locals())27 try:28 yield29 finally:30 if cleanup:31 run("nova delete %(server_name)s" % locals())32@contextlib.contextmanager33def snapshot_taken(server_name, snapshot_name, cleanup=True):34 run("nova image-create %(server_name)s %(snapshot_name)s"35 " --poll" % locals())36 try:37 yield38 finally:39 if cleanup:40 run("nova image-delete %(snapshot_name)s" % locals())41def migrate_server(server_name):42 run("nova migrate %(server_name)s --poll" % locals())43 cmd = "nova list | grep %(server_name)s | awk '{print $6}'" % locals()44 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)45 stdout, stderr = proc.communicate()46 status = stdout.strip()47 if status.upper() != 'VERIFY_RESIZE':48 print >> sys.stderr, "Server %(server_name)s failed to rebuild"\49 % locals()50 return False51 # Confirm the resize52 run("nova resize-confirm %(server_name)s" % locals())53 return True54def test_migrate(context):55 count, args = context56 server_name = "server%d" % count57 cleanup = args.cleanup58 with server_built(server_name, args.image, cleanup=cleanup):59 # Migrate A -> B60 result = migrate_server(server_name)61 if not result:62 return False63 # Migrate B -> A64 return migrate_server(server_name)65def rebuild_server(server_name, snapshot_name):66 run("nova rebuild %(server_name)s %(snapshot_name)s --poll" % locals())67 cmd = "nova list | grep %(server_name)s | awk '{print $6}'" % locals()68 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)69 stdout, stderr = proc.communicate()70 status = stdout.strip()71 if status != 'ACTIVE':72 print >> sys.stderr, "Server %(server_name)s failed to rebuild"\73 % locals()74 return False75 return True76def test_rebuild(context):77 count, args = context78 server_name = "server%d" % count79 snapshot_name = "snap%d" % count80 cleanup = args.cleanup81 with server_built(server_name, args.image, cleanup=cleanup):82 with snapshot_taken(server_name, snapshot_name, cleanup=cleanup):83 return rebuild_server(server_name, snapshot_name)84def _parse_args():85 parser = argparse.ArgumentParser(86 description='Test Nova for Race Conditions.')87 parser.add_argument('tests', metavar='TESTS', type=str, nargs='*',88 default=['rebuild', 'migrate'],89 help='tests to run: [rebuilt|migrate]')90 parser.add_argument('-i', '--image', help="image to build from",91 required=True)92 parser.add_argument('-n', '--num-runs', type=int, help="number of runs",93 default=1)94 parser.add_argument('-c', '--concurrency', type=int, default=5,95 help="number of concurrent processes")96 parser.add_argument('--no-cleanup', action='store_false', dest="cleanup",97 default=True)98 parser.add_argument('-d', '--dom0-ips',99 help="IP of dom0's to run cleanup script")100 return parser.parse_args()101def main():102 dom0_cleanup_script = DOM0_CLEANUP_SCRIPT103 args = _parse_args()104 if args.dom0_ips:105 dom0_ips = args.dom0_ips.split(',')106 else:107 dom0_ips = []108 start_time = time.time()109 batch_size = min(args.num_runs, args.concurrency)110 pool = multiprocessing.Pool(processes=args.concurrency)111 results = []112 for test in args.tests:113 test_func = globals().get("test_%s" % test)114 if not test_func:115 print >> sys.stderr, "test '%s' not found" % test116 sys.exit(1)117 contexts = [(x, args) for x in range(args.num_runs)]118 try:119 results += pool.map(test_func, contexts)120 finally:121 if args.cleanup:122 for dom0_ip in dom0_ips:123 run("ssh root@%(dom0_ip)s %(dom0_cleanup_script)s"124 % locals())125 success = all(results)126 result = "SUCCESS" if success else "FAILED"127 duration = time.time() - start_time128 print "%s, finished in %.2f secs" % (result, duration)129 sys.exit(0 if success else 1)130if __name__ == "__main__":...
test_context.py
Source:test_context.py
...24 context.AdminCleanup.validate(["a"])25 mock_manager.list_resource_names.assert_called_once_with(26 admin_required=True)27 @mock.patch("%s.manager" % BASE)28 def test_validate_no_such_cleanup(self, mock_manager):29 mock_manager.list_resource_names.return_value = set(["a", "b", "c"])30 self.assertRaises(context.NoSuchCleanupResources,31 context.AdminCleanup.validate, ["a", "d"])32 mock_manager.list_resource_names.assert_called_once_with(33 admin_required=True)34 def test_validate_invalid_config(self):35 self.assertRaises(jsonschema.ValidationError,36 context.AdminCleanup.validate, {})37 @mock.patch("%s.manager.find_resource_managers" % BASE,38 return_value=[mock.MagicMock(), mock.MagicMock()])39 @mock.patch("%s.manager.SeekAndDestroy" % BASE)40 def test_cleanup(self, mock_seek_and_destroy, mock_find_resource_managers):41 ctx = {42 "config": {"admin_cleanup": ["a", "b"]},43 "admin": mock.MagicMock(),44 "users": mock.MagicMock(),45 "task": mock.MagicMock()46 }47 admin_cleanup = context.AdminCleanup(ctx)48 admin_cleanup.setup()49 admin_cleanup.cleanup()50 mock_find_resource_managers.assert_called_once_with(["a", "b"], True)51 mock_seek_and_destroy.assert_has_calls([52 mock.call(53 mock_find_resource_managers.return_value[0],54 ctx["admin"],55 ctx["users"]),56 mock.call().exterminate(),57 mock.call(58 mock_find_resource_managers.return_value[1],59 ctx["admin"],60 ctx["users"]),61 mock.call().exterminate()62 ])63class UserCleanupTestCase(test.TestCase):64 @mock.patch("%s.manager" % BASE)65 def test_validate(self, mock_manager):66 mock_manager.list_resource_names.return_value = set(["a", "b", "c"])67 context.UserCleanup.validate(["a"])68 mock_manager.list_resource_names.assert_called_once_with(69 admin_required=False)70 @mock.patch("%s.manager" % BASE)71 def test_validate_no_such_cleanup(self, mock_manager):72 mock_manager.list_resource_names.return_value = set(["a", "b", "c"])73 self.assertRaises(context.NoSuchCleanupResources,74 context.UserCleanup.validate, ["a", "b", "d"])75 mock_manager.list_resource_names.assert_called_once_with(76 admin_required=False)77 def test_validate_invalid_config(self):78 self.assertRaises(jsonschema.ValidationError,79 context.UserCleanup.validate, {})80 @mock.patch("%s.manager.find_resource_managers" % BASE,81 return_value=[mock.MagicMock(), mock.MagicMock()])82 @mock.patch("%s.manager.SeekAndDestroy" % BASE)83 def test_cleanup(self, mock_seek_and_destroy, mock_find_resource_managers):84 ctx = {85 "config": {"cleanup": ["a", "b"]},86 "users": mock.MagicMock(),87 "task": mock.MagicMock()88 }89 admin_cleanup = context.UserCleanup(ctx)90 admin_cleanup.setup()91 admin_cleanup.cleanup()92 mock_find_resource_managers.assert_called_once_with(["a", "b"], False)93 mock_seek_and_destroy.assert_has_calls([94 mock.call(95 mock_find_resource_managers.return_value[0],96 None, ctx["users"]),97 mock.call().exterminate(),98 mock.call(99 mock_find_resource_managers.return_value[1],100 None, ctx["users"]),101 mock.call().exterminate()...
context.py
Source:context.py
...48 LOG.info(_("Couldn't find cleanup resource managers: %s")49 % missing)50 raise NoSuchCleanupResources(missing)51 @rutils.log_task_wrapper(LOG.info, _("admin resources cleanup"))52 def cleanup(self):53 manager.cleanup(names=self.config,54 admin_required=True,55 admin=self.context["admin"],56 users=self.context.get("users", []))57# NOTE(amaretskiy): Set maximum order to run this last58@context.configure(name="cleanup", order=sys.maxsize, hidden=True)59class UserCleanup(CleanupMixin, context.Context):60 """Context class for user resources cleanup."""61 @classmethod62 def validate(cls, config, non_hidden=False):63 super(UserCleanup, cls).validate(config, non_hidden)64 missing = set(config)65 missing -= manager.list_resource_names(admin_required=False)66 missing = ", ".join(missing)67 if missing:68 LOG.info(_("Couldn't find cleanup resource managers: %s")69 % missing)70 raise NoSuchCleanupResources(missing)71 @rutils.log_task_wrapper(LOG.info, _("user resources cleanup"))72 def cleanup(self):73 manager.cleanup(names=self.config,74 admin_required=False,...
Using AI Code Generation
1const { Pact } = require('@pact-foundation/pact');2const { somethingLike } = require('@pact-foundation/pact/dsl/matchers');3const { like } = require('@pact-foundation/pact/dsl/matchers');4const { eachLike } = require('@pact-foundation/pact/dsl/matchers');5const { term } = require('@pact-foundation/pact/dsl/matchers');6const { arrayContaining } = require('@pact-foundation/pact/dsl/matchers');7const { eachLike } = require('@pact-foundation/pact/dsl/matchers');8const { somethingLike } = require('@pact-foundation/pact/dsl/matchers');9const { like } = require('@pact-foundation/pact/dsl/matchers');10const { term } = require('@pact-foundation/pact/dsl/matchers');11const { eachLike } = require('@pact-foundation/pact/dsl/matchers');12const { somethingLike } = require('@pact-foundation/pact/dsl/matchers');13const { like } = require('@pact-foundation/pact/dsl/matchers');14const { term } = require('@pact-foundation/pact/dsl/matchers');15const { eachLike } = require('@pact-foundation/pact/dsl/matchers');16const { somethingLike } = require('@pact-foundation/pact/dsl/matchers');17const { like } = require('@pact-foundation/pact/dsl/matchers');18const { term } = require('@pact-foundation/pact/dsl/matchers');19const { eachLike } = require('@pact-foundation/pact/dsl/matchers');20const { somethingLike } = require('@pact-foundation/pact/dsl/matchers');21const { like } = require('@pact-foundation/pact/dsl/matchers');22const { term } = require('@pact-foundation/pact/dsl/matchers');23const { eachLike } = require('@pact-foundation/pact/dsl/matchers');24const { somethingLike } = require('@pact-foundation/pact/dsl/matchers');25const { like } = require('@pact-foundation/pact/dsl/matchers');26const { term } = require('@pact
Using AI Code Generation
1const pact = require("@pact-foundation/pact");2const { Verifier } = require("@pact-foundation/pact");3const { Matchers } = require("@pact-foundation/pact");4const { like, term, iso8601DateTimeWithMillis } = Matchers;5const { somethingLike } = Matchers;6describe("Pact Verification", () => {7 it("should validate the expectations of Test Consumer", () => {8 let opts = {9 {10 },11 };12 return new Verifier(opts)13 .verifyProvider()14 .then((output) => {15 console.log("Pact Verification Complete!");16 console.log(output);17 });18 });19});20{ providerApplicationVersion: null,21 [ { success: true,22 pactResponse: { status: 200, headers: [Object], body: [Object] },
Using AI Code Generation
1const pact = require('@pact-foundation/pact-node');2pact.removeAllServers().then(function() {3 console.log('all servers removed');4});5const pact = require('@pact-foundation/pact-node');6pact.removeAllServers().then(function() {7 console.log('all servers removed');8});
Using AI Code Generation
1const { Pact } = require('@pact-foundation/pact-node');2const path = require('path');3const opts = {4 pactFilesOrDirs: [path.resolve(process.cwd(), 'pacts')],5};6Pact.removeAll(opts)7 .then(() => {8 console.log('Pact contract files have been deleted');9 })10 .catch(e => {11 console.log('Error:', e);12 });
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!