Best Python code snippet using autotest_python
task_manager_unittest.py
Source:task_manager_unittest.py
1# Copyright 2016 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4import argparse5import contextlib6import errno7import os8import re9import shutil10import StringIO11import sys12import tempfile13import unittest14import common_util15import task_manager16_GOLDEN_GRAPHVIZ = """digraph graphname {17 n0 [label="0: b", color=black, shape=ellipse];18 n1 [label="1: a", color=black, shape=ellipse];19 n2 [label="2: c", color=black, shape=ellipse];20 n0 -> n2;21 n1 -> n2;22 n3 [label="3: d", color=black, shape=ellipse];23 n2 -> n3;24 n4 [label="4: f", color=black, shape=box];25 n3 -> n4;26 n5 [label="e", color=blue, shape=ellipse];27 n5 -> n4;28}\n"""29@contextlib.contextmanager30def EatStdoutAndStderr():31 """Overrides sys.std{out,err} to intercept write calls."""32 sys.stdout.flush()33 sys.stderr.flush()34 original_stdout = sys.stdout35 original_stderr = sys.stderr36 try:37 sys.stdout = StringIO.StringIO()38 sys.stderr = StringIO.StringIO()39 yield40 finally:41 sys.stdout = original_stdout42 sys.stderr = original_stderr43class TestException(Exception):44 pass45class TaskManagerTestCase(unittest.TestCase):46 def setUp(self):47 self.output_directory = tempfile.mkdtemp()48 def tearDown(self):49 shutil.rmtree(self.output_directory)50 def OutputPath(self, file_path):51 return os.path.join(self.output_directory, file_path)52 def TouchOutputFile(self, file_path):53 with open(self.OutputPath(file_path), 'w') as output:54 output.write(file_path + '\n')55class TaskTest(TaskManagerTestCase):56 def testTaskExecution(self):57 def Recipe():58 Recipe.counter += 159 Recipe.counter = 060 task = task_manager.Task('hello.json', 'what/ever/hello.json', [], Recipe)61 self.assertFalse(task._is_done)62 self.assertEqual(0, Recipe.counter)63 task.Execute()64 self.assertEqual(1, Recipe.counter)65 task.Execute()66 self.assertEqual(1, Recipe.counter)67 def testTaskExecutionWithUnexecutedDeps(self):68 def RecipeA():69 self.fail()70 def RecipeB():71 RecipeB.counter += 172 RecipeB.counter = 073 a = task_manager.Task('hello.json', 'out/hello.json', [], RecipeA)74 b = task_manager.Task('hello.json', 'out/hello.json', [a], RecipeB)75 self.assertEqual(0, RecipeB.counter)76 b.Execute()77 self.assertEqual(1, RecipeB.counter)78class BuilderTest(TaskManagerTestCase):79 def testRegisterTask(self):80 builder = task_manager.Builder(self.output_directory, None)81 @builder.RegisterTask('hello.txt')82 def TaskA():83 TaskA.executed = True84 TaskA.executed = False85 self.assertEqual(os.path.join(self.output_directory, 'hello.txt'),86 TaskA.path)87 self.assertFalse(TaskA.executed)88 TaskA.Execute()89 self.assertTrue(TaskA.executed)90 def testRegisterDuplicateTask(self):91 builder = task_manager.Builder(self.output_directory, None)92 @builder.RegisterTask('hello.txt')93 def TaskA():94 pass95 del TaskA # unused96 with self.assertRaises(task_manager.TaskError):97 @builder.RegisterTask('hello.txt')98 def TaskB():99 pass100 del TaskB # unused101 def testTaskMerging(self):102 builder = task_manager.Builder(self.output_directory, None)103 @builder.RegisterTask('hello.txt')104 def TaskA():105 pass106 @builder.RegisterTask('hello.txt', merge=True)107 def TaskB():108 pass109 self.assertEqual(TaskA, TaskB)110 def testOutputSubdirectory(self):111 builder = task_manager.Builder(self.output_directory, 'subdir')112 @builder.RegisterTask('world.txt')113 def TaskA():114 pass115 del TaskA # unused116 self.assertIn('subdir/world.txt', builder._tasks)117 self.assertNotIn('subdir/subdir/world.txt', builder._tasks)118 self.assertNotIn('world.txt', builder._tasks)119 @builder.RegisterTask('subdir/world.txt')120 def TaskB():121 pass122 del TaskB # unused123 self.assertIn('subdir/subdir/world.txt', builder._tasks)124 self.assertNotIn('world.txt', builder._tasks)125class GenerateScenarioTest(TaskManagerTestCase):126 def testParents(self):127 builder = task_manager.Builder(self.output_directory, None)128 @builder.RegisterTask('a')129 def TaskA():130 pass131 @builder.RegisterTask('b', dependencies=[TaskA])132 def TaskB():133 pass134 @builder.RegisterTask('c', dependencies=[TaskB])135 def TaskC():136 pass137 scenario = task_manager.GenerateScenario([TaskA, TaskB, TaskC], set())138 self.assertListEqual([TaskA, TaskB, TaskC], scenario)139 scenario = task_manager.GenerateScenario([TaskB], set())140 self.assertListEqual([TaskA, TaskB], scenario)141 scenario = task_manager.GenerateScenario([TaskC], set())142 self.assertListEqual([TaskA, TaskB, TaskC], scenario)143 scenario = task_manager.GenerateScenario([TaskC, TaskB], set())144 self.assertListEqual([TaskA, TaskB, TaskC], scenario)145 def testFreezing(self):146 builder = task_manager.Builder(self.output_directory, None)147 @builder.RegisterTask('a')148 def TaskA():149 pass150 @builder.RegisterTask('b', dependencies=[TaskA])151 def TaskB():152 pass153 @builder.RegisterTask('c')154 def TaskC():155 pass156 @builder.RegisterTask('d', dependencies=[TaskB, TaskC])157 def TaskD():158 pass159 # assert no exception raised.160 task_manager.GenerateScenario([TaskB], set([TaskC]))161 with self.assertRaises(task_manager.TaskError):162 task_manager.GenerateScenario([TaskD], set([TaskA]))163 self.TouchOutputFile('a')164 scenario = task_manager.GenerateScenario([TaskD], set([TaskA]))165 self.assertListEqual([TaskB, TaskC, TaskD], scenario)166 self.TouchOutputFile('b')167 scenario = task_manager.GenerateScenario([TaskD], set([TaskB]))168 self.assertListEqual([TaskC, TaskD], scenario)169 def testCycleError(self):170 builder = task_manager.Builder(self.output_directory, None)171 @builder.RegisterTask('a')172 def TaskA():173 pass174 @builder.RegisterTask('b', dependencies=[TaskA])175 def TaskB():176 pass177 @builder.RegisterTask('c', dependencies=[TaskB])178 def TaskC():179 pass180 @builder.RegisterTask('d', dependencies=[TaskC])181 def TaskD():182 pass183 TaskA._dependencies.append(TaskC)184 with self.assertRaises(task_manager.TaskError):185 task_manager.GenerateScenario([TaskD], set())186 def testCollisionError(self):187 builder_a = task_manager.Builder(self.output_directory, None)188 builder_b = task_manager.Builder(self.output_directory, None)189 @builder_a.RegisterTask('a')190 def TaskA():191 pass192 @builder_b.RegisterTask('a')193 def TaskB():194 pass195 with self.assertRaises(task_manager.TaskError):196 task_manager.GenerateScenario([TaskA, TaskB], set())197 def testGenerateDependentSetPerTask(self):198 builder = task_manager.Builder(self.output_directory, None)199 @builder.RegisterTask('a')200 def TaskA():201 pass202 @builder.RegisterTask('b')203 def TaskB():204 pass205 @builder.RegisterTask('c', dependencies=[TaskA, TaskB])206 def TaskC():207 pass208 @builder.RegisterTask('d', dependencies=[TaskA])209 def TaskD():210 pass211 def RunSubTest(expected, scenario, task):212 self.assertEqual(213 expected, task_manager.GenerateDependentSetPerTask(scenario)[task])214 RunSubTest(set([]), [TaskA], TaskA)215 RunSubTest(set([]), [TaskA, TaskB], TaskA)216 RunSubTest(set([TaskC]), [TaskA, TaskB, TaskC], TaskA)217 RunSubTest(set([TaskC, TaskD]), [TaskA, TaskB, TaskC, TaskD], TaskA)218 RunSubTest(set([]), [TaskA, TaskD], TaskD)219 def testGraphVizOutput(self):220 builder = task_manager.Builder(self.output_directory, None)221 @builder.RegisterTask('a')222 def TaskA():223 pass224 @builder.RegisterTask('b')225 def TaskB():226 pass227 @builder.RegisterTask('c', dependencies=[TaskB, TaskA])228 def TaskC():229 pass230 @builder.RegisterTask('d', dependencies=[TaskC])231 def TaskD():232 pass233 @builder.RegisterTask('e')234 def TaskE():235 pass236 @builder.RegisterTask('f', dependencies=[TaskD, TaskE])237 def TaskF():238 pass239 self.TouchOutputFile('e')240 scenario = task_manager.GenerateScenario([TaskF], set([TaskE]))241 output = StringIO.StringIO()242 task_manager.OutputGraphViz(scenario, [TaskF], output)243 self.assertEqual(_GOLDEN_GRAPHVIZ, output.getvalue())244 def testListResumingTasksToFreeze(self):245 TaskManagerTestCase.setUp(self)246 builder = task_manager.Builder(self.output_directory, None)247 @builder.RegisterTask('a')248 def TaskA():249 pass250 @builder.RegisterTask('b')251 def TaskB():252 pass253 @builder.RegisterTask('c', dependencies=[TaskA, TaskB])254 def TaskC():255 pass256 @builder.RegisterTask('d', dependencies=[TaskA])257 def TaskD():258 pass259 @builder.RegisterTask('e', dependencies=[TaskC])260 def TaskE():261 pass262 @builder.RegisterTask('f', dependencies=[TaskC])263 def TaskF():264 pass265 for k in 'abcdef':266 self.TouchOutputFile(k)267 def RunSubTest(268 final_tasks, initial_frozen_tasks, skipped_tasks, reference):269 scenario = task_manager.GenerateScenario(270 final_tasks, initial_frozen_tasks)271 resume_frozen_tasks = task_manager.ListResumingTasksToFreeze(272 scenario, final_tasks, skipped_tasks)273 self.assertEqual(reference, resume_frozen_tasks)274 new_scenario = \275 task_manager.GenerateScenario(final_tasks, resume_frozen_tasks)276 self.assertEqual(skipped_tasks, set(new_scenario))277 RunSubTest([TaskA], set([]), set([TaskA]), [])278 RunSubTest([TaskD], set([]), set([TaskA, TaskD]), [])279 RunSubTest([TaskD], set([]), set([TaskD]), [TaskA])280 RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskB, TaskC, TaskE, TaskF]),281 [TaskA])282 RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskC, TaskE, TaskF]),283 [TaskA, TaskB])284 RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskE, TaskF]), [TaskC])285 RunSubTest([TaskE, TaskF], set([TaskA]), set([TaskF]), [TaskE, TaskC])286 RunSubTest([TaskD, TaskE, TaskF], set([]), set([TaskD, TaskF]),287 [TaskA, TaskE, TaskC])288class CommandLineControlledExecutionTest(TaskManagerTestCase):289 def setUp(self):290 TaskManagerTestCase.setUp(self)291 self.with_raise_exception_tasks = False292 self.task_execution_history = None293 def Execute(self, command_line_args):294 self.task_execution_history = []295 builder = task_manager.Builder(self.output_directory, None)296 @builder.RegisterTask('a')297 def TaskA():298 self.task_execution_history.append(TaskA.name)299 @builder.RegisterTask('b')300 def TaskB():301 self.task_execution_history.append(TaskB.name)302 @builder.RegisterTask('c', dependencies=[TaskA, TaskB])303 def TaskC():304 self.task_execution_history.append(TaskC.name)305 @builder.RegisterTask('d', dependencies=[TaskA])306 def TaskD():307 self.task_execution_history.append(TaskD.name)308 @builder.RegisterTask('e', dependencies=[TaskC])309 def TaskE():310 self.task_execution_history.append(TaskE.name)311 @builder.RegisterTask('raise_exception', dependencies=[TaskD])312 def RaiseExceptionTask():313 self.task_execution_history.append(RaiseExceptionTask.name)314 raise TestException('Expected error.')315 @builder.RegisterTask('raise_keyboard_interrupt', dependencies=[TaskD])316 def RaiseKeyboardInterruptTask():317 self.task_execution_history.append(RaiseKeyboardInterruptTask.name)318 raise KeyboardInterrupt319 @builder.RegisterTask('sudden_death', dependencies=[TaskD])320 def SimulateKillTask():321 self.task_execution_history.append(SimulateKillTask.name)322 raise MemoryError323 @builder.RegisterTask('timeout_error', dependencies=[TaskD])324 def SimulateTimeoutError():325 self.task_execution_history.append(SimulateTimeoutError.name)326 raise common_util.TimeoutError327 @builder.RegisterTask('errno_ENOSPC', dependencies=[TaskD])328 def SimulateENOSPC():329 self.task_execution_history.append(SimulateENOSPC.name)330 raise IOError(errno.ENOSPC, os.strerror(errno.ENOSPC))331 @builder.RegisterTask('errno_EPERM', dependencies=[TaskD])332 def SimulateEPERM():333 self.task_execution_history.append(SimulateEPERM.name)334 raise IOError(errno.EPERM, os.strerror(errno.EPERM))335 default_final_tasks = [TaskD, TaskE]336 if self.with_raise_exception_tasks:337 default_final_tasks.extend([338 RaiseExceptionTask,339 RaiseKeyboardInterruptTask,340 SimulateKillTask,341 SimulateTimeoutError,342 SimulateENOSPC,343 SimulateEPERM])344 task_parser = task_manager.CommandLineParser()345 parser = argparse.ArgumentParser(parents=[task_parser],346 fromfile_prefix_chars=task_manager.FROMFILE_PREFIX_CHARS)347 cmd = ['-o', self.output_directory]348 cmd.extend([i for i in command_line_args])349 args = parser.parse_args(cmd)350 with EatStdoutAndStderr():351 return task_manager.ExecuteWithCommandLine(args, default_final_tasks)352 def ResumeFilePath(self):353 return self.OutputPath(task_manager._TASK_RESUME_ARGUMENTS_FILE)354 def ResumeCmd(self):355 return task_manager.FROMFILE_PREFIX_CHARS + self.ResumeFilePath()356 def testSimple(self):357 self.assertEqual(0, self.Execute([]))358 self.assertListEqual(['a', 'd', 'b', 'c', 'e'], self.task_execution_history)359 def testDryRun(self):360 self.assertEqual(0, self.Execute(['-d']))361 self.assertListEqual([], self.task_execution_history)362 def testRegex(self):363 self.assertEqual(0, self.Execute(['-e', 'b', '-e', 'd']))364 self.assertListEqual(['b', 'a', 'd'], self.task_execution_history)365 self.assertEqual(1, self.Execute(['-e', r'\d']))366 self.assertListEqual([], self.task_execution_history)367 def testFreezing(self):368 self.assertEqual(0, self.Execute(['-f', r'\d']))369 self.assertListEqual(['a', 'd', 'b', 'c', 'e'], self.task_execution_history)370 self.TouchOutputFile('c')371 self.assertEqual(0, self.Execute(['-f', 'c']))372 self.assertListEqual(['a', 'd', 'e'], self.task_execution_history)373 def testDontFreezeUnreachableTasks(self):374 self.TouchOutputFile('c')375 self.assertEqual(0, self.Execute(['-e', 'e', '-f', 'c', '-f', 'd']))376 def testAbortOnFirstError(self):377 ARGS = ['-e', 'exception', '-e', r'^b$']378 self.with_raise_exception_tasks = True379 self.assertEqual(1, self.Execute(ARGS))380 self.assertListEqual(381 ['a', 'd', 'raise_exception'], self.task_execution_history)382 with open(self.ResumeFilePath()) as resume_input:383 self.assertEqual('-f\n^d$', resume_input.read())384 self.TouchOutputFile('d')385 self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))386 self.assertListEqual(['raise_exception'], self.task_execution_history)387 self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))388 self.assertListEqual(['raise_exception'], self.task_execution_history)389 self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd(), '-k']))390 self.assertListEqual(['raise_exception', 'b'], self.task_execution_history)391 def testKeepGoing(self):392 ARGS = ['-k', '-e', 'exception', '-e', r'^b$']393 self.with_raise_exception_tasks = True394 self.assertEqual(1, self.Execute(ARGS))395 self.assertListEqual(396 ['a', 'd', 'raise_exception', 'b'], self.task_execution_history)397 with open(self.ResumeFilePath()) as resume_input:398 self.assertEqual('-f\n^d$\n-f\n^b$', resume_input.read())399 self.TouchOutputFile('d')400 self.TouchOutputFile('b')401 self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))402 self.assertListEqual(['raise_exception'], self.task_execution_history)403 self.assertEqual(1, self.Execute(ARGS + [self.ResumeCmd()]))404 self.assertListEqual(['raise_exception'], self.task_execution_history)405 def testKeyboardInterrupt(self):406 self.with_raise_exception_tasks = True407 with self.assertRaises(KeyboardInterrupt):408 self.Execute(409 ['-k', '-e', 'raise_keyboard_interrupt', '-e', r'^b$'])410 self.assertListEqual(['a', 'd', 'raise_keyboard_interrupt'],411 self.task_execution_history)412 with open(self.ResumeFilePath()) as resume_input:413 self.assertEqual('-f\n^d$', resume_input.read())414 def testResumeAfterSuddenDeath(self):415 EXPECTED_RESUME_FILE_CONTENT = '-f\n^a$\n-f\n^d$\n'416 ARGS = ['-k', '-e', 'sudden_death', '-e', r'^a$']417 self.with_raise_exception_tasks = True418 with self.assertRaises(MemoryError):419 self.Execute(ARGS)420 self.assertListEqual(421 ['a', 'd', 'sudden_death'], self.task_execution_history)422 with open(self.ResumeFilePath()) as resume_input:423 self.assertEqual(EXPECTED_RESUME_FILE_CONTENT, resume_input.read())424 self.TouchOutputFile('a')425 self.TouchOutputFile('d')426 with self.assertRaises(MemoryError):427 self.Execute(ARGS + [self.ResumeCmd()])428 self.assertListEqual(['sudden_death'], self.task_execution_history)429 with open(self.ResumeFilePath()) as resume_input:430 self.assertEqual(EXPECTED_RESUME_FILE_CONTENT, resume_input.read())431 with self.assertRaises(MemoryError):432 self.Execute(ARGS + [self.ResumeCmd()])433 self.assertListEqual(['sudden_death'], self.task_execution_history)434 with open(self.ResumeFilePath()) as resume_input:435 self.assertEqual(EXPECTED_RESUME_FILE_CONTENT, resume_input.read())436 def testTimeoutError(self):437 self.with_raise_exception_tasks = True438 self.Execute(['-k', '-e', 'timeout_error', '-e', r'^b$'])439 self.assertListEqual(['a', 'd', 'timeout_error', 'b'],440 self.task_execution_history)441 with open(self.ResumeFilePath()) as resume_input:442 self.assertEqual('-f\n^d$\n-f\n^b$', resume_input.read())443 def testENOSPC(self):444 self.with_raise_exception_tasks = True445 with self.assertRaises(IOError):446 self.Execute(['-k', '-e', 'errno_ENOSPC', '-e', r'^a$'])447 self.assertListEqual(448 ['a', 'd', 'errno_ENOSPC'], self.task_execution_history)449 with open(self.ResumeFilePath()) as resume_input:450 self.assertEqual('-f\n^a$\n-f\n^d$\n', resume_input.read())451 def testEPERM(self):452 self.with_raise_exception_tasks = True453 self.Execute(['-k', '-e', 'errno_EPERM', '-e', r'^b$'])454 self.assertListEqual(['a', 'd', 'errno_EPERM', 'b'],455 self.task_execution_history)456 with open(self.ResumeFilePath()) as resume_input:457 self.assertEqual('-f\n^d$\n-f\n^b$', resume_input.read())458 def testImpossibleTasks(self):459 self.assertEqual(1, self.Execute(['-f', r'^a$', '-e', r'^c$']))460 self.assertListEqual([], self.task_execution_history)461 self.assertEqual(0, self.Execute(462 ['-f', r'^a$', '-e', r'^c$', '-e', r'^b$']))463 self.assertListEqual(['b'], self.task_execution_history)464if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!