Best Python code snippet using playwright-python
test_processdispatcher_service.py
Source:test_processdispatcher_service.py
...322 queued=queued)323 # ensure that procs that request existing engine id run324 self.client.schedule_process("proc2", self.process_definition_id,325 queueing_mode=proc1_queueing_mode, execution_engine_id="engine1")326 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)327 self._wait_assert_pd_dump(self._assert_process_states,328 ProcessState.RUNNING, ["proc2"])329 # ensure that procs that don't specify an engine id run330 self.client.schedule_process("proc3", self.process_definition_id,331 queueing_mode=proc1_queueing_mode)332 self.notifier.wait_for_state("proc3", ProcessState.RUNNING)333 self._wait_assert_pd_dump(self._assert_process_states,334 ProcessState.RUNNING, ["proc3"])335 # now add an engine for proc1 and it should be scheduled336 self.client.node_state("node2", domain_id_from_engine("engine2"),337 InstanceState.RUNNING)338 self._spawn_eeagent("node2", 4)339 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)340 self._wait_assert_pd_dump(self._assert_process_states,341 ProcessState.RUNNING, ["proc1"])342 # now launch another process for engine2. it should be scheduled too343 self.client.schedule_process("proc4", self.process_definition_id,344 queueing_mode=QueueingMode.NEVER, execution_engine_id="engine2")345 self.notifier.wait_for_state("proc4", ProcessState.RUNNING)346 self._wait_assert_pd_dump(self._assert_process_states,347 ProcessState.RUNNING, ["proc4"])348 def test_default_ee(self):349 self.client.node_state("node1", domain_id_from_engine("engine1"),350 InstanceState.RUNNING)351 self._spawn_eeagent("node1", 4)352 self.client.node_state("node2", domain_id_from_engine("engine2"),353 InstanceState.RUNNING)354 self._spawn_eeagent("node2", 4)355 # fill up all 4 slots on engine1 agent and launch one more proc356 for upid in ['p1', 'p2', 'p3', 'p4', 'p5']:357 self.client.schedule_process(upid, self.process_definition_id,358 queueing_mode=QueueingMode.ALWAYS)359 self.notifier.wait_for_state('p1', ProcessState.RUNNING)360 self.notifier.wait_for_state('p2', ProcessState.RUNNING)361 self.notifier.wait_for_state('p3', ProcessState.RUNNING)362 self.notifier.wait_for_state('p4', ProcessState.RUNNING)363 # p5 should be queued since it is not compatible with engine2364 self.notifier.wait_for_state('p5', ProcessState.WAITING)365 # now schedule p6 directly to engine2366 self.client.schedule_process("p6", self.process_definition_id,367 queueing_mode=QueueingMode.ALWAYS, execution_engine_id="engine2")368 self.notifier.wait_for_state('p1', ProcessState.RUNNING)369 # add another eeagent for engine1, p5 should run370 self.client.node_state("node3", domain_id_from_engine("engine1"),371 InstanceState.RUNNING)372 self._spawn_eeagent("node3", 4)373 self.notifier.wait_for_state('p5', ProcessState.RUNNING)374 def test_process_engine_map(self):375 def1 = uuid.uuid4().hex376 self.client.create_definition(def1, "dtype",377 executable={"module": "a.b", "class": "C"},378 name="my_process")379 def2 = uuid.uuid4().hex380 self.client.create_definition(def2, "dtype",381 executable={"module": "a.b", "class": "D"},382 name="my_process")383 def3 = uuid.uuid4().hex384 self.client.create_definition(def3, "dtype",385 executable={"module": "a", "class": "B"},386 name="my_process")387 self.client.node_state("node1", domain_id_from_engine("engine1"),388 InstanceState.RUNNING)389 eeagent1 = self._spawn_eeagent("node1", 4)390 self.client.node_state("node2", domain_id_from_engine("engine2"),391 InstanceState.RUNNING)392 eeagent2 = self._spawn_eeagent("node2", 4)393 self.client.node_state("node3", domain_id_from_engine("engine3"),394 InstanceState.RUNNING)395 eeagent3 = self._spawn_eeagent("node3", 4)396 self.client.schedule_process("proc1", def1)397 self.client.schedule_process("proc2", def2)398 self.client.schedule_process("proc3", def3)399 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)400 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)401 self.notifier.wait_for_state("proc3", ProcessState.RUNNING)402 proc1 = self.client.describe_process("proc1")403 self.assertEqual(proc1['assigned'], eeagent3.name)404 proc2 = self.client.describe_process("proc2")405 self.assertEqual(proc2['assigned'], eeagent2.name)406 proc3 = self.client.describe_process("proc3")407 self.assertEqual(proc3['assigned'], eeagent1.name)408 def test_node_exclusive(self):409 node = "node1"410 domain_id = domain_id_from_engine('engine1')411 node_properties = dict(engine="fedora")412 self.client.node_state(node, domain_id, InstanceState.RUNNING,413 node_properties)414 self._spawn_eeagent(node, 4)415 exclusive_attr = "hamsandwich"416 queued = []417 proc1_queueing_mode = QueueingMode.ALWAYS418 # Process should be scheduled, since no other procs have its419 # exclusive attribute420 self.client.schedule_process("proc1", self.process_definition_id,421 queueing_mode=proc1_queueing_mode,422 node_exclusive=exclusive_attr)423 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)424 self._wait_assert_pd_dump(self._assert_process_states,425 ProcessState.RUNNING, ["proc1"])426 # Process should be queued, because proc1 has the same attribute427 self.client.schedule_process("proc2", self.process_definition_id,428 queueing_mode=proc1_queueing_mode,429 node_exclusive=exclusive_attr)430 queued.append("proc2")431 self._wait_assert_pd_dump(self._assert_process_distribution,432 queued=queued)433 # Now kill the first process, and proc2 should run.434 self.client.terminate_process("proc1")435 queued.remove("proc2")436 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)437 self._wait_assert_pd_dump(self._assert_process_states,438 ProcessState.RUNNING, ["proc2"])439 # Process should be queued, because proc2 has the same attribute440 self.client.schedule_process("proc3", self.process_definition_id,441 queueing_mode=proc1_queueing_mode,442 node_exclusive=exclusive_attr)443 queued.append("proc3")444 self._wait_assert_pd_dump(self._assert_process_distribution,445 queued=queued)446 # Process should be scheduled, since no other procs have its447 # exclusive attribute448 other_exclusive_attr = "hummussandwich"449 self.client.schedule_process("proc4", self.process_definition_id,450 queueing_mode=proc1_queueing_mode,451 node_exclusive=other_exclusive_attr)452 self.notifier.wait_for_state("proc4", ProcessState.RUNNING)453 self._wait_assert_pd_dump(self._assert_process_states,454 ProcessState.RUNNING, ["proc4"])455 # Now that we've started another node, waiting node should start456 node = "node2"457 node_properties = dict(engine="fedora")458 self.client.node_state(node, domain_id, InstanceState.RUNNING,459 node_properties)460 self._spawn_eeagent(node, 4)461 self.notifier.wait_for_state("proc3", ProcessState.RUNNING)462 self._wait_assert_pd_dump(self._assert_process_states,463 ProcessState.RUNNING, ["proc3"])464 def test_node_exclusive_bug(self):465 slots = 2466 node_1 = "node1"467 domain_id = domain_id_from_engine('engine1')468 node_properties = dict(engine="fedora")469 self.client.node_state(node_1, domain_id, InstanceState.RUNNING,470 node_properties)471 self._spawn_eeagent(node_1, slots)472 node_2 = "node2"473 domain_id = domain_id_from_engine('engine1')474 node_properties = dict(engine="fedora")475 self.client.node_state(node_2, domain_id, InstanceState.RUNNING,476 node_properties)477 self._spawn_eeagent(node_2, slots)478 node_3 = "node3"479 domain_id = domain_id_from_engine('engine1')480 node_properties = dict(engine="fedora")481 self.client.node_state(node_3, domain_id, InstanceState.RUNNING,482 node_properties)483 self._spawn_eeagent(node_3, slots)484 node_4 = "node4"485 domain_id = domain_id_from_engine('engine1')486 node_properties = dict(engine="fedora")487 self.client.node_state(node_4, domain_id, InstanceState.RUNNING,488 node_properties)489 self._spawn_eeagent(node_4, slots)490 pydap_xattr = "pydap"491 service_gateway_xattr = "service_gateway"492 queueing_mode = QueueingMode.START_ONLY493 # Process should be scheduled, since no other procs have its494 # exclusive attribute495 pydap_xattr_procs = []496 service_gateway_xattr_procs = []497 proc_1 = "proc_1"498 self.client.schedule_process(proc_1, self.process_definition_id,499 queueing_mode=queueing_mode,500 node_exclusive=pydap_xattr)501 pydap_xattr_procs.append(proc_1)502 proc_2 = "proc_2"503 self.client.schedule_process(proc_2, self.process_definition_id,504 queueing_mode=queueing_mode,505 node_exclusive=service_gateway_xattr)506 pydap_xattr_procs.append(proc_2)507 proc_3 = "proc_3"508 self.client.schedule_process(proc_3, self.process_definition_id,509 queueing_mode=queueing_mode,510 node_exclusive=service_gateway_xattr)511 service_gateway_xattr_procs.append(proc_1)512 proc_4 = "proc_4"513 self.client.schedule_process(proc_4, self.process_definition_id,514 queueing_mode=queueing_mode,515 node_exclusive=pydap_xattr)516 pydap_xattr_procs.append(proc_4)517 proc_5 = "proc_5"518 self.client.schedule_process(proc_5, self.process_definition_id,519 queueing_mode=queueing_mode,520 node_exclusive=service_gateway_xattr)521 service_gateway_xattr_procs.append(proc_5)522 proc_6 = "proc_6"523 self.client.schedule_process(proc_6, self.process_definition_id,524 queueing_mode=queueing_mode,525 node_exclusive=pydap_xattr)526 pydap_xattr_procs.append(proc_6)527 proc_7 = "proc_7"528 self.client.schedule_process(proc_7, self.process_definition_id,529 queueing_mode=queueing_mode,530 node_exclusive=service_gateway_xattr)531 service_gateway_xattr_procs.append(proc_7)532 proc_8 = "proc_8"533 self.client.schedule_process(proc_8, self.process_definition_id,534 queueing_mode=queueing_mode,535 node_exclusive=pydap_xattr)536 pydap_xattr_procs.append(proc_8)537 for proc in (pydap_xattr_procs + service_gateway_xattr_procs):538 self.notifier.wait_for_state(proc, ProcessState.RUNNING)539 self._wait_assert_pd_dump(self._assert_process_states,540 ProcessState.RUNNING, [proc])541 self._wait_assert_pd_dump(self._assert_node_exclusive)542 self.client.terminate_process(proc_8)543 self._wait_assert_pd_dump(self._assert_node_exclusive)544 def test_node_exclusive_multiple_eeagents(self):545 node = "node1"546 domain_id = domain_id_from_engine('engine1')547 node_properties = dict(engine="fedora")548 self.client.node_state(node, domain_id, InstanceState.RUNNING,549 node_properties)550 self._spawn_eeagent(node, 4)551 self._spawn_eeagent(node, 4)552 exclusive_attr = "hamsandwich"553 queued = []554 proc1_queueing_mode = QueueingMode.ALWAYS555 # Process should be scheduled, since no other procs have its556 # exclusive attribute557 self.client.schedule_process("proc1", self.process_definition_id,558 queueing_mode=proc1_queueing_mode,559 node_exclusive=exclusive_attr)560 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)561 self._wait_assert_pd_dump(self._assert_process_states,562 ProcessState.RUNNING, ["proc1"])563 # Process should be queued, because proc1 has the same attribute564 self.client.schedule_process("proc2", self.process_definition_id,565 queueing_mode=proc1_queueing_mode,566 node_exclusive=exclusive_attr)567 queued.append("proc2")568 self._wait_assert_pd_dump(self._assert_process_distribution,569 queued=queued)570 # Now kill the first process, and proc2 should run.571 self.client.terminate_process("proc1")572 queued.remove("proc2")573 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)574 self._wait_assert_pd_dump(self._assert_process_states,575 ProcessState.RUNNING, ["proc2"])576 # Process should be queued, because proc2 has the same attribute577 self.client.schedule_process("proc3", self.process_definition_id,578 queueing_mode=proc1_queueing_mode,579 node_exclusive=exclusive_attr)580 queued.append("proc3")581 self._wait_assert_pd_dump(self._assert_process_distribution,582 queued=queued)583 # Process should be scheduled, since no other procs have its584 # exclusive attribute585 other_exclusive_attr = "hummussandwich"586 self.client.schedule_process("proc4", self.process_definition_id,587 queueing_mode=proc1_queueing_mode,588 node_exclusive=other_exclusive_attr)589 self.notifier.wait_for_state("proc4", ProcessState.RUNNING)590 self._wait_assert_pd_dump(self._assert_process_states,591 ProcessState.RUNNING, ["proc4"])592 # Now that we've started another node, waiting node should start593 node = "node2"594 node_properties = dict(engine="fedora")595 self.client.node_state(node, domain_id, InstanceState.RUNNING,596 node_properties)597 self._spawn_eeagent(node, 4)598 self.notifier.wait_for_state("proc3", ProcessState.RUNNING)599 self._wait_assert_pd_dump(self._assert_process_states,600 ProcessState.RUNNING, ["proc3"])601 def test_queueing(self):602 # submit some processes before there are any resources available603 procs = ["proc1", "proc2", "proc3", "proc4", "proc5"]604 for proc in procs:605 procstate = self.client.schedule_process(proc, self.process_definition_id)606 self.assertEqual(procstate['upid'], proc)607 for proc in procs:608 self.notifier.wait_for_state(proc, ProcessState.WAITING)609 self._wait_assert_pd_dump(self._assert_process_states,610 ProcessState.WAITING, procs)611 # add 2 nodes and a resource that supports 4 processes612 nodes = ["node1", "node2"]613 domain_id = domain_id_from_engine('engine1')614 for node in nodes:615 self.client.node_state(node, domain_id, InstanceState.RUNNING)616 self._spawn_eeagent(nodes[0], 4)617 for proc in procs[:4]:618 self.notifier.wait_for_state(proc, ProcessState.RUNNING)619 self._wait_assert_pd_dump(self._assert_process_states,620 ProcessState.RUNNING, procs[:4])621 for proc in procs[4:]:622 self.notifier.wait_for_state(proc, ProcessState.WAITING)623 self._wait_assert_pd_dump(self._assert_process_states,624 ProcessState.WAITING, procs[4:])625 # stand up a resource on the second node to support the other process626 self._spawn_eeagent(nodes[1], 4)627 # all processes should now be running628 for proc in procs:629 self.notifier.wait_for_state(proc, ProcessState.RUNNING)630 self._wait_assert_pd_dump(self._assert_process_states,631 ProcessState.RUNNING, procs)632 def _assert_process_states(self, dump, expected_state, upids):633 for upid in upids:634 process = dump['processes'][upid]635 assert process['state'] == expected_state, "%s: %s, expected %s!" % (636 upid, process['state'], expected_state)637 def test_node_death(self):638 # set up two nodes with 4 slots each639 nodes = ['node1', 'node2']640 domain_id = domain_id_from_engine('engine1')641 for node in nodes:642 self.client.node_state(node, domain_id, InstanceState.RUNNING)643 for node in nodes:644 self._spawn_eeagent(node, 4)645 # 8 total slots are available, schedule 6 processes646 procs = ['proc' + str(i + 1) for i in range(6)]647 # schedule the first process to never restart. it shouldn't come back.648 self.client.schedule_process(procs[0], self.process_definition_id,649 restart_mode=RestartMode.NEVER)650 # and the second process to restart on abnormal termination. it should651 # come back.652 self.client.schedule_process(procs[1], self.process_definition_id,653 restart_mode=RestartMode.ABNORMAL)654 for proc in procs[2:]:655 self.client.schedule_process(proc, self.process_definition_id)656 self._wait_assert_pd_dump(self._assert_process_distribution,657 node_counts=[4, 2],658 queued_count=0)659 # now kill one node660 log.debug("killing node %s", nodes[0])661 self.client.node_state(nodes[0], domain_id, InstanceState.TERMINATING)662 # 5 procesess should be rescheduled. since we have 5 processes and only663 # 4 slots, 1 should be queued664 self._wait_assert_pd_dump(self._assert_process_distribution,665 node_counts=[4],666 queued_count=1)667 # ensure that the correct process was not rescheduled668 self.notifier.wait_for_state(procs[0], ProcessState.FAILED)669 def _assert_process_distribution(self, dump, nodes=None, node_counts=None,670 agents=None, agent_counts=None,671 queued=None, queued_count=None,672 rejected=None, rejected_count=None):673 # Assert the distribution of processes among nodes674 # node and agent counts are given as sequences of integers which are not675 # specific to a named node. So specifying node_counts=[4,3] will match676 # as long as you have 4 processes assigned to one node and 3 to another,677 # regardless of the node name678 found_rejected = set()679 found_queued = set()680 found_node = defaultdict(set)681 found_assigned = defaultdict(set)682 for process in dump['processes'].itervalues():683 upid = process['upid']684 assigned = process['assigned']685 if process['state'] == ProcessState.WAITING:686 found_queued.add(upid)687 elif process['state'] == ProcessState.REJECTED:688 found_rejected.add(upid)689 elif process['state'] == ProcessState.RUNNING:690 resource = dump['resources'].get(assigned)691 self.assertIsNotNone(resource)692 node_id = resource['node_id']693 found_node[node_id].add(upid)694 found_assigned[assigned].add(upid)695 print "Queued: %s\nRejected: %s\n" % (queued, rejected)696 print "Found Queued: %s\nFound Rejected: %s\n" % (found_queued, found_rejected)697 if queued is not None:698 self.assertEqual(set(queued), found_queued)699 if queued_count is not None:700 self.assertEqual(len(found_queued), queued_count)701 if rejected is not None:702 self.assertEqual(set(rejected), found_rejected)703 if rejected_count is not None:704 self.assertEqual(len(found_rejected), rejected_count)705 if agents is not None:706 self.assertEqual(set(agents.keys()), set(found_assigned.keys()))707 for ee_id, processes in found_assigned.iteritems():708 self.assertEqual(set(agents[ee_id]), processes)709 if agent_counts is not None:710 assigned_lengths = [len(s) for s in found_assigned.itervalues()]711 # omit zero counts712 agent_counts = [count for count in agent_counts if count != 0]713 # print "%s =?= %s" % (agent_counts, assigned_lengths)714 self.assertEqual(sorted(assigned_lengths), sorted(agent_counts))715 if nodes is not None:716 self.assertEqual(set(nodes.keys()), set(found_node.keys()))717 for node_id, processes in found_node.iteritems():718 self.assertEqual(set(nodes[node_id]), processes)719 if node_counts is not None:720 node_lengths = [len(s) for s in found_node.itervalues()]721 # omit zero counts722 node_counts = [count for count in node_counts if count != 0]723 self.assertEqual(sorted(node_lengths), sorted(node_counts))724 def _assert_node_exclusive(self, dump):725 """assert that processes are distributed in a way consistent726 with the node exclusive properties of those processes727 """728 exclusive_dist = {}729 for proc_id, proc in dump['processes'].iteritems():730 if proc['state'] == '700-TERMINATED':731 continue732 assigned = proc.get('assigned')733 assert assigned is not None, proc734 node_exclusive = proc.get('node_exclusive')735 assert node_exclusive is not None736 if exclusive_dist.get(assigned) is None:737 exclusive_dist[assigned] = []738 exclusive_dist[assigned].append(node_exclusive)739 exclusive_dist[assigned].sort()740 for node, exclusives in exclusive_dist.iteritems():741 assert len(exclusives) == len(set(exclusives))742 exclusive_dist_nodes = {}743 exclusive_dist_resources = {}744 for node_id, node in dump['nodes'].iteritems():745 exclusive_dist_nodes[node_id] = node['node_exclusive']746 exclusive_dist_nodes[node_id].sort()747 for resource in node['resources']:748 exclusive_dist_resources[resource] = node['node_exclusive']749 exclusive_dist_resources[resource].sort()750 for node, exclusives in exclusive_dist_nodes.iteritems():751 assert len(exclusives) == len(set(exclusives))752 print "nodes: %s" % exclusive_dist_nodes753 print "resources: %s" % exclusive_dist_resources754 print "proc: %s" % exclusive_dist755 assert exclusive_dist == exclusive_dist_resources, "%s != %s" % (exclusive_dist, exclusive_dist_resources)756 return exclusive_dist757 def test_constraints(self):758 nodes = ['node1', 'node2']759 domain_id = domain_id_from_engine('engine1')760 node1_properties = dict(hat_type="fedora")761 node2_properties = dict(hat_type="bowler")762 self.client.node_state(nodes[0], domain_id, InstanceState.RUNNING,763 node1_properties)764 self._spawn_eeagent(nodes[0], 4)765 proc1_constraints = dict(hat_type="fedora")766 proc2_constraints = dict(hat_type="bowler")767 self.client.schedule_process("proc1", self.process_definition_id,768 constraints=proc1_constraints)769 self.client.schedule_process("proc2", self.process_definition_id,770 constraints=proc2_constraints)771 # proc1 should be running on the node/agent, proc2 queued772 self._wait_assert_pd_dump(self._assert_process_distribution,773 nodes=dict(node1=["proc1"]),774 queued=["proc2"])775 # launch another eeagent that supports proc2's engine_type776 self.client.node_state(nodes[1], domain_id, InstanceState.RUNNING,777 node2_properties)778 self._spawn_eeagent(nodes[1], 4)779 self._wait_assert_pd_dump(self._assert_process_distribution,780 nodes=dict(node1=["proc1"],781 node2=["proc2"]),782 queued=[])783 def test_queue_mode(self):784 constraints = dict(hat_type="fedora")785 queued = []786 rejected = []787 # Test QueueingMode.NEVER788 proc1_queueing_mode = QueueingMode.NEVER789 self.client.schedule_process("proc1", self.process_definition_id,790 constraints=constraints, queueing_mode=proc1_queueing_mode)791 # proc1 should be rejected792 rejected.append("proc1")793 self._wait_assert_pd_dump(self._assert_process_distribution,794 rejected=rejected)795 # Test QueueingMode.ALWAYS796 proc2_queueing_mode = QueueingMode.ALWAYS797 self.client.schedule_process("proc2", self.process_definition_id,798 constraints=constraints, queueing_mode=proc2_queueing_mode)799 # proc2 should be queued800 queued.append("proc2")801 self._wait_assert_pd_dump(self._assert_process_distribution,802 queued=queued)803 # Test QueueingMode.START_ONLY804 proc3_queueing_mode = QueueingMode.START_ONLY805 proc3_restart_mode = RestartMode.ALWAYS806 self.client.schedule_process("proc3", self.process_definition_id,807 constraints=constraints, queueing_mode=proc3_queueing_mode,808 restart_mode=proc3_restart_mode)809 # proc3 should be queued, since its start_only810 queued.append("proc3")811 self._wait_assert_pd_dump(self._assert_process_distribution,812 queued=queued)813 node = "node1"814 domain_id = domain_id_from_engine('engine1')815 node_properties = dict(hat_type="fedora")816 self.client.node_state(node, domain_id, InstanceState.RUNNING,817 node_properties)818 self._spawn_eeagent(node, 4)819 # we created a node, so it should now run820 self.notifier.wait_for_state("proc3", ProcessState.RUNNING)821 self._wait_assert_pd_dump(self._assert_process_states,822 ProcessState.RUNNING, ["proc3"])823 log.debug("killing node %s", node)824 self._kill_all_eeagents()825 self.client.node_state(node, domain_id, InstanceState.TERMINATING)826 # proc3 should now be rejected, because its START_ONLY827 queued.remove("proc3")828 rejected.append("proc3")829 self._wait_assert_pd_dump(self._assert_process_distribution,830 rejected=rejected)831 # Test QueueingMode.RESTART_ONLY832 # First test that its rejected if it doesn't start right away833 proc4_queueing_mode = QueueingMode.RESTART_ONLY834 proc4_restart_mode = RestartMode.ALWAYS835 self.client.schedule_process("proc4", self.process_definition_id,836 constraints=constraints, queueing_mode=proc4_queueing_mode,837 restart_mode=proc4_restart_mode)838 # proc4 should be rejected, since its RESTART_ONLY839 rejected.append("proc4")840 self._wait_assert_pd_dump(self._assert_process_distribution,841 rejected=rejected)842 # Second test that if a proc starts, it'll get queued after it fails843 proc5_queueing_mode = QueueingMode.RESTART_ONLY844 proc5_restart_mode = RestartMode.ALWAYS845 # Start a node846 self.client.node_state(node, domain_id, InstanceState.RUNNING,847 node_properties)848 self._spawn_eeagent(node, 4)849 self.client.schedule_process("proc5", self.process_definition_id,850 constraints=constraints, queueing_mode=proc5_queueing_mode,851 restart_mode=proc5_restart_mode)852 self.notifier.wait_for_state("proc5", ProcessState.RUNNING)853 self._wait_assert_pd_dump(self._assert_process_states,854 ProcessState.RUNNING, ["proc5"])855 log.debug("killing node %s", node)856 self.client.node_state(node, domain_id, InstanceState.TERMINATING)857 self._kill_all_eeagents()858 # proc5 should be queued, since its RESTART_ONLY859 queued.append("proc5")860 self._wait_assert_pd_dump(self._assert_process_distribution,861 queued=queued)862 def test_restart_mode_never(self):863 constraints = dict(hat_type="fedora")864 # Start a node865 node = "node1"866 domain_id = domain_id_from_engine('engine1')867 node_properties = dict(hat_type="fedora")868 self.client.node_state(node, domain_id, InstanceState.RUNNING,869 node_properties)870 eeagent = self._spawn_eeagent(node, 4)871 # Test RestartMode.NEVER872 proc1_queueing_mode = QueueingMode.ALWAYS873 proc1_restart_mode = RestartMode.NEVER874 self.client.schedule_process("proc1", self.process_definition_id,875 constraints=constraints, queueing_mode=proc1_queueing_mode,876 restart_mode=proc1_restart_mode)877 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)878 self._wait_assert_pd_dump(self._assert_process_states,879 ProcessState.RUNNING, ["proc1"])880 eeagent.fail_process("proc1")881 self.notifier.wait_for_state("proc1", ProcessState.FAILED)882 self._wait_assert_pd_dump(self._assert_process_states,883 ProcessState.FAILED, ["proc1"])884 def test_restart_mode_always(self):885 constraints = dict(hat_type="fedora")886 queued = []887 # Start a node888 node = "node1"889 domain_id = domain_id_from_engine('engine1')890 node_properties = dict(hat_type="fedora")891 self.client.node_state(node, domain_id, InstanceState.RUNNING,892 node_properties)893 eeagent = self._spawn_eeagent(node, 4)894 # Test RestartMode.ALWAYS895 proc2_queueing_mode = QueueingMode.ALWAYS896 proc2_restart_mode = RestartMode.ALWAYS897 self.client.schedule_process("proc2", self.process_definition_id,898 constraints=constraints, queueing_mode=proc2_queueing_mode,899 restart_mode=proc2_restart_mode, configuration={'process': {'minimum_time_between_starts': 0.1}})900 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)901 self._wait_assert_pd_dump(self._assert_process_states,902 ProcessState.RUNNING, ["proc2"])903 eeagent.exit_process("proc2")904 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)905 self._wait_assert_pd_dump(self._assert_process_states,906 ProcessState.RUNNING, ["proc2"])907 eeagent.fail_process("proc2")908 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)909 self._wait_assert_pd_dump(self._assert_process_states,910 ProcessState.RUNNING, ["proc2"])911 log.debug("killing node %s", node)912 self.client.node_state(node, domain_id, InstanceState.TERMINATING)913 self._kill_all_eeagents()914 # proc2 should be queued, since there are no more resources915 queued.append("proc2")916 self._wait_assert_pd_dump(self._assert_process_distribution,917 queued=queued)918 def test_restart_mode_abnormal(self):919 constraints = dict(hat_type="fedora")920 queued = []921 # Start a node922 node = "node1"923 domain_id = domain_id_from_engine('engine1')924 node_properties = dict(hat_type="fedora")925 self.client.node_state(node, domain_id, InstanceState.RUNNING,926 node_properties)927 eeagent = self._spawn_eeagent(node, 4)928 # Test RestartMode.ABNORMAL929 proc2_queueing_mode = QueueingMode.ALWAYS930 proc2_restart_mode = RestartMode.ABNORMAL931 self.client.schedule_process("proc2", self.process_definition_id,932 constraints=constraints, queueing_mode=proc2_queueing_mode,933 restart_mode=proc2_restart_mode)934 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)935 self._wait_assert_pd_dump(self._assert_process_states,936 ProcessState.RUNNING, ["proc2"])937 eeagent.fail_process("proc2")938 # This can be very slow on buildbot, hence the long timeout939 self.notifier.wait_for_state("proc2", ProcessState.RUNNING, 60)940 self._wait_assert_pd_dump(self._assert_process_states,941 ProcessState.RUNNING, ["proc2"])942 log.debug("killing node %s", node)943 self.client.node_state(node, domain_id, InstanceState.TERMINATING)944 self._kill_all_eeagents()945 # proc2 should be queued, since there are no more resources946 queued.append("proc2")947 self._wait_assert_pd_dump(self._assert_process_distribution,948 queued=queued)949 self.client.node_state(node, domain_id, InstanceState.RUNNING,950 node_properties)951 eeagent = self._spawn_eeagent(node, 4)952 self.client.schedule_process("proc1", self.process_definition_id,953 constraints=constraints, queueing_mode=proc2_queueing_mode,954 restart_mode=proc2_restart_mode)955 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)956 eeagent.exit_process("proc1")957 self.notifier.wait_for_state("proc1", ProcessState.EXITED)958 self._wait_assert_pd_dump(self._assert_process_states,959 ProcessState.EXITED, ["proc1"])960 def test_start_count(self):961 nodes = ['node1']962 domain_id = domain_id_from_engine('engine1')963 node1_properties = dict(hat_type="fedora")964 self.client.node_state(nodes[0], domain_id, InstanceState.RUNNING,965 node1_properties)966 self._spawn_eeagent(nodes[0], 4)967 proc1_constraints = dict(hat_type="fedora")968 self.client.schedule_process("proc1", self.process_definition_id,969 constraints=proc1_constraints)970 # proc1 should be running on the node/agent, proc2 queued971 self._wait_assert_pd_dump(self._assert_process_distribution,972 nodes=dict(node1=["proc1"]))973 proc = self.store.get_process(None, "proc1")974 self.assertEqual(proc.starts, 1)975 time.sleep(2)976 self.client.restart_process("proc1")977 # proc1 should be running on the node/agent, proc2 queued978 self._wait_assert_pd_dump(self._assert_process_distribution,979 nodes=dict(node1=["proc1"]))980 proc = self.store.get_process(None, "proc1")981 self.assertEqual(proc.starts, 2)982 def test_minimum_time_between_starts(self):983 constraints = dict(hat_type="fedora")984 # Start a node985 node = "node1"986 domain_id = domain_id_from_engine('engine1')987 node_properties = dict(hat_type="fedora")988 self.client.node_state(node, domain_id, InstanceState.RUNNING,989 node_properties)990 eeagent = self._spawn_eeagent(node, 4)991 # Test RestartMode.ALWAYS992 queueing_mode = QueueingMode.ALWAYS993 restart_mode = RestartMode.ALWAYS994 default_time_to_throttle = 2995 time_to_throttle = 10996 self.client.schedule_process("proc1", self.process_definition_id,997 constraints=constraints, queueing_mode=queueing_mode,998 restart_mode=restart_mode)999 self.client.schedule_process("proc2", self.process_definition_id,1000 constraints=constraints, queueing_mode=queueing_mode,1001 restart_mode=restart_mode,1002 configuration=minimum_time_between_starts_config(time_to_throttle))1003 # Processes should start once without delay1004 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)1005 self._wait_assert_pd_dump(self._assert_process_states,1006 ProcessState.RUNNING, ["proc1"])1007 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)1008 self._wait_assert_pd_dump(self._assert_process_states,1009 ProcessState.RUNNING, ["proc2"])1010 # Processes should be restarted once without delay1011 eeagent.exit_process("proc1")1012 eeagent.exit_process("proc2")1013 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)1014 self._wait_assert_pd_dump(self._assert_process_states,1015 ProcessState.RUNNING, ["proc1"])1016 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)1017 self._wait_assert_pd_dump(self._assert_process_states,1018 ProcessState.RUNNING, ["proc2"])1019 # The second time proc1 should be throttled for 2s (the default), and1020 # proc2 should be throttled for the configured 5s1021 eeagent.exit_process("proc1")1022 eeagent.exit_process("proc2")1023 self.notifier.wait_for_state("proc1", ProcessState.WAITING)1024 self._wait_assert_pd_dump(self._assert_process_states,1025 ProcessState.WAITING, ["proc1"])1026 self.notifier.wait_for_state("proc2", ProcessState.WAITING)1027 self._wait_assert_pd_dump(self._assert_process_states,1028 ProcessState.WAITING, ["proc2"])1029 # After waiting a few seconds, proc1 should be restarted1030 time.sleep(default_time_to_throttle + 1)1031 self.notifier.wait_for_state("proc1", ProcessState.RUNNING)1032 self._wait_assert_pd_dump(self._assert_process_states,1033 ProcessState.RUNNING, ["proc1"])1034 self.notifier.wait_for_state("proc2", ProcessState.WAITING)1035 self._wait_assert_pd_dump(self._assert_process_states,1036 ProcessState.WAITING, ["proc2"])1037 # After a few more secs, proc2 should be restarted as well1038 time.sleep(time_to_throttle - (default_time_to_throttle + 1) + 1)1039 self.notifier.wait_for_state("proc2", ProcessState.RUNNING)1040 self._wait_assert_pd_dump(self._assert_process_states,1041 ProcessState.RUNNING, ["proc2"])1042 def test_describe(self):1043 self.client.schedule_process("proc1", self.process_definition_id)1044 processes = self.client.describe_processes()1045 self.assertEqual(len(processes), 1)1046 self.assertEqual(processes[0]['upid'], "proc1")1047 proc1 = self.client.describe_process("proc1")1048 self.assertEqual(proc1['upid'], "proc1")1049 self.client.schedule_process("proc2", self.process_definition_id)1050 processes = self.client.describe_processes()1051 self.assertEqual(len(processes), 2)1052 if processes[0]['upid'] == "proc1":1053 self.assertEqual(processes[1]['upid'], "proc2")1054 elif processes[0]['upid'] == "proc2":1055 self.assertEqual(processes[1]['upid'], "proc1")1056 else:1057 self.fail()1058 proc1 = self.client.describe_process("proc1")1059 self.assertEqual(proc1['upid'], "proc1")1060 proc2 = self.client.describe_process("proc2")1061 self.assertEqual(proc2['upid'], "proc2")1062 def test_process_exited(self):1063 node = "node1"1064 domain_id = domain_id_from_engine('engine1')1065 self.client.node_state(node, domain_id, InstanceState.RUNNING)1066 self._spawn_eeagent(node, 1)1067 proc = "proc1"1068 self.client.schedule_process(proc, self.process_definition_id)1069 self._wait_assert_pd_dump(self._assert_process_states,1070 ProcessState.RUNNING, [proc])1071 agent = self._get_eeagent_for_process(proc)1072 agent.exit_process(proc)1073 self._wait_assert_pd_dump(self._assert_process_states,1074 ProcessState.EXITED, [proc])1075 self.notifier.wait_for_state(proc, ProcessState.EXITED)1076 def test_neediness(self, process_count=20, node_count=5):1077 procs = ["proc" + str(i) for i in range(process_count)]1078 for proc in procs:1079 procstate = self.client.schedule_process(proc,1080 self.process_definition_id)1081 self.assertEqual(procstate['upid'], proc)1082 self._wait_assert_pd_dump(self._assert_process_states,1083 ProcessState.WAITING, procs)1084 for i in range(3):1085 # retry this a few times to avoid a race between processes1086 # hitting WAITING state and the needs being registered1087 try:1088 self.epum_client.assert_needs(range(node_count + 1),1089 domain_id_from_engine("engine1"))1090 break1091 except AssertionError:1092 time.sleep(0.01)1093 self.epum_client.clear()1094 # now provide nodes and resources, processes should start1095 nodes = ["node" + str(i) for i in range(node_count)]1096 domain_id = domain_id_from_engine('engine1')1097 for node in nodes:1098 self.client.node_state(node, domain_id, InstanceState.RUNNING)1099 for node in nodes:1100 self._spawn_eeagent(node, 4)1101 self._wait_assert_pd_dump(self._assert_process_states,1102 ProcessState.RUNNING, procs)1103 # now kill all processes in a random order1104 killlist = list(procs)1105 random.shuffle(killlist)1106 for proc in killlist:1107 self.client.terminate_process(proc)1108 self._wait_assert_pd_dump(self._assert_process_states,1109 ProcessState.TERMINATED, procs)1110 for i in range(3):1111 # retry this a few times to avoid a race between processes1112 # hitting WAITING state and the needs being registered1113 try:1114 self.epum_client.assert_needs(range(node_count + 1),1115 domain_id_from_engine("engine1"))1116 break1117 except AssertionError:1118 time.sleep(0.01)1119 def test_definitions(self):1120 self.client.create_definition("d1", "t1", "notepad.exe")1121 d1 = self.client.describe_definition("d1")1122 self.assertEqual(d1['definition_id'], "d1")1123 self.assertEqual(d1['definition_type'], "t1")1124 self.assertEqual(d1['executable'], "notepad.exe")1125 self.client.update_definition("d1", "t1", "notepad2.exe")1126 d1 = self.client.describe_definition("d1")1127 self.assertEqual(d1['executable'], "notepad2.exe")1128 d_list = self.client.list_definitions()1129 self.assertIn("d1", d_list)1130 self.client.remove_definition("d1")1131 def test_reschedule_process(self):1132 node = "node1"1133 domain_id = domain_id_from_engine('engine1')1134 self.client.node_state(node, domain_id, InstanceState.RUNNING)1135 self._spawn_eeagent(node, 1)1136 proc = "proc1"1137 # start a process that is never restarted automatically.1138 self.client.create_process(proc, self.process_definition_id)1139 self.client.schedule_process(proc, restart_mode=RestartMode.NEVER)1140 self._wait_assert_pd_dump(self._assert_process_states,1141 ProcessState.RUNNING, [proc])1142 agent = self._get_eeagent_for_process(proc)1143 agent.exit_process(proc)1144 self._wait_assert_pd_dump(self._assert_process_states,1145 ProcessState.EXITED, [proc])1146 self.notifier.wait_for_state(proc, ProcessState.EXITED)1147 record = self.client.schedule_process(proc)1148 self.assertEqual(record['state'], ProcessState.REQUESTED)1149 self.notifier.wait_for_state(proc, ProcessState.RUNNING)1150 # now fail the process. it should still be restartable.1151 agent.fail_process(proc)1152 def test_create_schedule(self):1153 node = "node1"1154 domain_id = domain_id_from_engine('engine1')1155 self.client.node_state(node, domain_id, InstanceState.RUNNING)1156 self._spawn_eeagent(node, 1)1157 proc = "proc1"1158 # create a process. it should be UNSCHEDULED until we schedule it1159 self.client.create_process(proc, self.process_definition_id)1160 self._wait_assert_pd_dump(self._assert_process_states,1161 ProcessState.UNSCHEDULED, [proc])1162 # creating again is harmless1163 self.client.create_process(proc, self.process_definition_id)1164 # now schedule it1165 self.client.schedule_process(proc)1166 self._wait_assert_pd_dump(self._assert_process_states,1167 ProcessState.RUNNING, [proc])1168 self.notifier.wait_for_state(proc, ProcessState.RUNNING)1169 # scheduling again is harmless1170 self.client.schedule_process(proc)1171 def test_restart_system_boot(self):1172 # set up some state in the PD before restart1173 self.client.node_state("node1", domain_id_from_engine("engine1"),1174 InstanceState.RUNNING)1175 self._spawn_eeagent("node1", 4)1176 procs = [('p1', RestartMode.ABNORMAL, None),1177 ('p2', RestartMode.ALWAYS, None),1178 ('p3', RestartMode.NEVER, None),1179 ('p4', RestartMode.ALWAYS, nosystemrestart_process_config()),1180 ('p5', RestartMode.ABNORMAL, None),1181 ('p6', RestartMode.ABNORMAL, nosystemrestart_process_config())]1182 # fill up all 4 slots on engine1 agent and launch 2 more procs1183 for upid, restart_mode, config in procs:1184 self.client.schedule_process(upid, self.process_definition_id,1185 queueing_mode=QueueingMode.ALWAYS, restart_mode=restart_mode,1186 configuration=config)1187 self.notifier.wait_for_state('p1', ProcessState.RUNNING)1188 self.notifier.wait_for_state('p2', ProcessState.RUNNING)1189 self.notifier.wait_for_state('p3', ProcessState.RUNNING)1190 self.notifier.wait_for_state('p4', ProcessState.RUNNING)1191 self.notifier.wait_for_state('p5', ProcessState.WAITING)1192 self.notifier.wait_for_state('p6', ProcessState.WAITING)1193 # now kill PD and eeagents. come back in system restart mode.1194 self.stop_pd()1195 self._kill_all_eeagents()1196 self.store.initialize()1197 self.store.set_system_boot(True)1198 self.store.shutdown()1199 self.start_pd()1200 self.store.wait_initialized(timeout=20)1201 # some processes should come back pending. others should fail out1202 # due to their restart mode flag.1203 self.notifier.wait_for_state('p1', ProcessState.UNSCHEDULED_PENDING)1204 self.notifier.wait_for_state('p2', ProcessState.UNSCHEDULED_PENDING)1205 self.notifier.wait_for_state('p3', ProcessState.TERMINATED)1206 self.notifier.wait_for_state('p4', ProcessState.TERMINATED)1207 self.notifier.wait_for_state('p5', ProcessState.UNSCHEDULED_PENDING)1208 self.notifier.wait_for_state('p6', ProcessState.TERMINATED)1209 # add resources back1210 self.client.node_state("node1", domain_id_from_engine("engine1"),1211 InstanceState.RUNNING)1212 self._spawn_eeagent("node1", 4)1213 # now launch a new process to make sure scheduling still works during1214 # system boot mode1215 self.client.schedule_process("p7", self.process_definition_id)1216 # and restart a couple of the dead procs. one FAILED and one UNSCHEDULED_PENDING1217 self.client.schedule_process("p1")1218 self.client.schedule_process("p4")1219 self.notifier.wait_for_state('p1', ProcessState.RUNNING)1220 self.notifier.wait_for_state('p4', ProcessState.RUNNING)1221 self.notifier.wait_for_state('p7', ProcessState.RUNNING)1222 # finally, end system boot mode. the remaining 2 U-P procs should be scheduled1223 self.client.set_system_boot(False)1224 self._wait_assert_pd_dump(self._assert_process_distribution,1225 node_counts=[4],1226 queued_count=1)1227 # one process will end up queued. doesn't matter which1228 p2 = self.client.describe_process("p2")1229 p5 = self.client.describe_process("p5")1230 states = set([p2['state'], p5['state']])1231 self.assertEqual(states, set([ProcessState.RUNNING, ProcessState.WAITING]))1232 def test_missing_ee(self):1233 """test_missing_ee1234 Ensure that the PD kills lingering processes on eeagents after they've been1235 evacuated.1236 """1237 # create some fake nodes and tell PD about them1238 node_1 = "node1"1239 domain_id = domain_id_from_engine("engine4")1240 self.client.node_state(node_1, domain_id, InstanceState.RUNNING)1241 # PD knows about this node but hasn't gotten a heartbeat yet1242 # spawn the eeagents and tell them all to heartbeat1243 eeagent_1 = self._spawn_eeagent(node_1, 1)1244 def assert_all_resources(state):1245 eeagent_nodes = set()1246 for resource in state['resources'].itervalues():1247 eeagent_nodes.add(resource['node_id'])1248 self.assertEqual(set([node_1]), eeagent_nodes)1249 self._wait_assert_pd_dump(assert_all_resources)1250 time_to_throttle = 01251 self.client.schedule_process("p1", self.process_definition_id, execution_engine_id="engine4",1252 configuration=minimum_time_between_starts_config(time_to_throttle))1253 # Send a heartbeat to show the process is RUNNING, then wait for doctor1254 # to mark the eeagent missing1255 time.sleep(1)1256 eeagent_1.send_heartbeat()1257 self.notifier.wait_for_state('p1', ProcessState.RUNNING, timeout=30)1258 self.notifier.wait_for_state('p1', ProcessState.WAITING, timeout=30)1259 # Check that process is still 'Running' on the eeagent, even the PD has1260 # since marked it failed1261 eeagent_process = eeagent_1._get_process_with_upid('p1')1262 self.assertEqual(eeagent_process['u_pid'], 'p1')1263 self.assertEqual(eeagent_process['state'], ProcessState.RUNNING)1264 self.assertEqual(eeagent_process['round'], 0)1265 # Now send another heartbeat to start getting procs again1266 eeagent_1.send_heartbeat()1267 self.notifier.wait_for_state('p1', ProcessState.RUNNING, timeout=30)1268 eeagent_process = eeagent_1._get_process_with_upid('p1')1269 self.assertEqual(eeagent_process['u_pid'], 'p1')1270 self.assertEqual(eeagent_process['state'], ProcessState.RUNNING)1271 self.assertEqual(eeagent_process['round'], 1)1272 # The pd should now have rescheduled the proc, and terminated the1273 # lingering process1274 self.assertEqual(len(eeagent_1.history), 1)1275 terminated_history = eeagent_1.history[0]1276 self.assertEqual(terminated_history['u_pid'], 'p1')1277 self.assertEqual(terminated_history['state'], ProcessState.TERMINATED)1278 self.assertEqual(terminated_history['round'], 0)1279 def test_matchmaker_msg_retry(self):1280 node = "node1"1281 domain_id = domain_id_from_engine('engine1')1282 self.client.node_state(node, domain_id, InstanceState.RUNNING)1283 self._spawn_eeagent(node, 1)1284 # sneak in and shorten retry time1285 self.pd.matchmaker.process_launcher.retry_seconds = 0.51286 proc1 = "proc1"1287 proc2 = "proc2"1288 with patch.object(self.pd.matchmaker.process_launcher, "resource_client") as mock_resource_client:1289 # first process request goes to mock, not real client but no error1290 self.client.schedule_process(proc1, self.process_definition_id)1291 # second process hits an error but that should not cause problems1292 mock_resource_client.launch_process.side_effect = Exception("boom!")1293 self.client.schedule_process(proc2, self.process_definition_id)1294 self._wait_assert_pd_dump(self._assert_process_states,1295 ProcessState.ASSIGNED, [proc1, proc2])1296 self.assertEqual(mock_resource_client.launch_process.call_count, 2)1297 # now resource client works again. those messages should be retried1298 self.notifier.wait_for_state(proc1, ProcessState.RUNNING)1299 self.notifier.wait_for_state(proc2, ProcessState.RUNNING)1300 self._wait_assert_pd_dump(self._assert_process_states,1301 ProcessState.RUNNING, [proc1, proc2])1302class ProcessDispatcherServiceZooKeeperTests(ProcessDispatcherServiceTests, ZooKeeperTestMixin):1303 # this runs all of the ProcessDispatcherService tests wih a ZK store1304 def setup_store(self):1305 self.setup_zookeeper(base_path_prefix="/processdispatcher_service_tests_")1306 store = ProcessDispatcherZooKeeperStore(self.zk_hosts,1307 self.zk_base_path, use_gevent=self.use_gevent)1308 store.initialize()1309 return store1310 def teardown_store(self):1311 if self.store:1312 self.store.shutdown()1313 self.teardown_zookeeper()...
test_update.py
Source:test_update.py
...67 return "test_update_stateless_job_spec.yaml"68 return "test_stateless_job_spec_k8s.yaml"69def test__create_update(stateless_job, in_place):70 stateless_job.create()71 stateless_job.wait_for_state(goal_state="RUNNING")72 old_pod_infos = stateless_job.query_pods()73 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()74 update = StatelessUpdate(75 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC76 )77 update.create(in_place=in_place)78 update.wait_for_state(goal_state="SUCCEEDED")79 new_pod_infos = stateless_job.query_pods()80 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()81 assert_pod_id_changed(old_pod_infos, new_pod_infos)82 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)83def test__create_update_add_instances(stateless_job, in_place):84 stateless_job.create()85 stateless_job.wait_for_state(goal_state="RUNNING")86 old_pod_infos = stateless_job.query_pods()87 update = StatelessUpdate(88 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC89 )90 update.create(in_place=in_place)91 update.wait_for_state(goal_state="SUCCEEDED")92 new_pod_infos = stateless_job.query_pods()93 assert len(old_pod_infos) == 394 assert len(new_pod_infos) == 595def test__create_update_update_and_add_instances(stateless_job, in_place):96 stateless_job.create()97 stateless_job.wait_for_state(goal_state="RUNNING")98 old_pod_infos = stateless_job.query_pods()99 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()100 update = StatelessUpdate(101 stateless_job,102 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,103 )104 update.create(in_place=in_place)105 update.wait_for_state(goal_state="SUCCEEDED")106 new_pod_infos = stateless_job.query_pods()107 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()108 assert len(old_pod_infos) == 3109 assert len(new_pod_infos) == 5110 assert_pod_id_changed(old_pod_infos, new_pod_infos)111 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)112def test__create_update_update_start_paused(stateless_job, in_place):113 stateless_job.create()114 stateless_job.wait_for_state(goal_state="RUNNING")115 old_pod_infos = stateless_job.query_pods()116 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()117 update = StatelessUpdate(118 stateless_job,119 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,120 start_paused=True,121 )122 update.create(in_place=in_place)123 update.wait_for_state(goal_state="PAUSED")124 update.resume()125 update.wait_for_state(goal_state="SUCCEEDED")126 new_pod_infos = stateless_job.query_pods()127 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()128 assert len(old_pod_infos) == 3129 assert len(new_pod_infos) == 5130 assert_pod_id_changed(old_pod_infos, new_pod_infos)131 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)132def test__create_update_with_batch_size(stateless_job, in_place):133 stateless_job.create()134 stateless_job.wait_for_state(goal_state="RUNNING")135 old_pod_infos = stateless_job.query_pods()136 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()137 update = StatelessUpdate(138 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC, batch_size=1139 )140 update.create(in_place=in_place)141 update.wait_for_state(goal_state="SUCCEEDED")142 new_pod_infos = stateless_job.query_pods()143 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()144 assert_pod_id_changed(old_pod_infos, new_pod_infos)145 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)146def test__create_update_add_instances_with_batch_size(stateless_job, in_place):147 stateless_job.create()148 stateless_job.wait_for_state(goal_state="RUNNING")149 old_pod_infos = stateless_job.query_pods()150 update = StatelessUpdate(151 stateless_job,152 updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC,153 batch_size=1,154 )155 update.create(in_place=in_place)156 update.wait_for_state(goal_state="SUCCEEDED")157 new_pod_infos = stateless_job.query_pods()158 assert len(old_pod_infos) == 3159 assert len(new_pod_infos) == 5160def test__create_update_update_and_add_instances_with_batch(stateless_job, in_place):161 stateless_job.create()162 stateless_job.wait_for_state(goal_state="RUNNING")163 old_pod_infos = stateless_job.query_pods()164 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()165 update = StatelessUpdate(166 stateless_job,167 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,168 batch_size=1,169 )170 update.create(in_place=in_place)171 update.wait_for_state(goal_state="SUCCEEDED")172 new_pod_infos = stateless_job.query_pods()173 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()174 assert len(old_pod_infos) == 3175 assert len(new_pod_infos) == 5176 assert_pod_id_changed(old_pod_infos, new_pod_infos)177 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)178def test__create_update_bad_version(stateless_job, in_place):179 stateless_job.create()180 stateless_job.wait_for_state(goal_state="RUNNING")181 update = StatelessUpdate(182 stateless_job,183 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,184 batch_size=1,185 )186 try:187 update.create(entity_version="1-2-3", in_place=in_place)188 except grpc.RpcError as e:189 assert e.code() == grpc.StatusCode.ABORTED190 assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()191 return192 raise Exception("entity version mismatch error not received")193def test__pause_update_bad_version(stateless_job, in_place):194 stateless_job.create()195 stateless_job.wait_for_state(goal_state="RUNNING")196 update = StatelessUpdate(197 stateless_job,198 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,199 batch_size=1,200 )201 update.create(in_place=in_place)202 try:203 update.pause(entity_version="1-2-3")204 except grpc.RpcError as e:205 assert e.code() == grpc.StatusCode.ABORTED206 assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()207 return208 raise Exception("entity version mismatch error not received")209def test__resume_update_bad_version(stateless_job, in_place):210 stateless_job.create()211 stateless_job.wait_for_state(goal_state="RUNNING")212 update = StatelessUpdate(213 stateless_job,214 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,215 start_paused=True,216 batch_size=1,217 )218 update.create(in_place=in_place)219 try:220 update.resume(entity_version="1-2-3")221 except grpc.RpcError as e:222 assert e.code() == grpc.StatusCode.ABORTED223 assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()224 return225 raise Exception("entity version mismatch error not received")226def test__abort_update_bad_version(stateless_job, in_place):227 stateless_job.create()228 stateless_job.wait_for_state(goal_state="RUNNING")229 update = StatelessUpdate(230 stateless_job,231 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,232 batch_size=1,233 )234 update.create(in_place=in_place)235 try:236 update.abort(entity_version="1-2-3")237 except grpc.RpcError as e:238 assert e.code() == grpc.StatusCode.ABORTED239 assert INVALID_ENTITY_VERSION_ERR_MESSAGE in e.details()240 return241 raise Exception("entity version mismatch error not received")242def test__create_update_stopped_job(stateless_job, in_place):243 stateless_job.create()244 stateless_job.wait_for_state(goal_state="RUNNING")245 old_pod_infos = stateless_job.query_pods()246 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()247 old_pod_states = set()248 for pod_info in old_pod_infos:249 old_pod_states.add(pod_info.spec.pod_name.value)250 stateless_job.stop()251 stateless_job.wait_for_state(goal_state="KILLED")252 update = StatelessUpdate(253 stateless_job,254 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,255 batch_size=1,256 )257 update.create(in_place=in_place)258 stateless_job.start()259 update.wait_for_state(goal_state="SUCCEEDED")260 stateless_job.wait_for_state(goal_state="RUNNING")261 new_pod_infos = stateless_job.query_pods()262 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()263 assert len(old_pod_infos) == 3264 assert len(new_pod_infos) == 5265 assert_pod_id_changed(old_pod_infos, new_pod_infos)266 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)267 # Only new instances should be RUNNING268 for pod_info in new_pod_infos:269 if pod_info.spec.pod_name.value in new_pod_infos:270 assert pod_info.status.state == pod_pb2.POD_STATE_KILLED271 else:272 assert pod_info.status.state == pod_pb2.POD_STATE_RUNNING273def test__create_update_stopped_tasks(stateless_job, in_place):274 stateless_job.create()275 stateless_job.wait_for_state(goal_state="RUNNING")276 old_pod_infos = stateless_job.query_pods()277 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()278 stateless_job.stop()279 update = StatelessUpdate(280 stateless_job,281 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,282 batch_size=1,283 )284 update.create(in_place=in_place)285 update.wait_for_state(goal_state="SUCCEEDED")286 stateless_job.wait_for_state(goal_state="KILLED")287 stateless_job.start()288 stateless_job.wait_for_state(goal_state="RUNNING")289 new_pod_infos = stateless_job.query_pods()290 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()291 assert len(old_pod_infos) == 3292 assert len(new_pod_infos) == 5293 assert_pod_id_changed(old_pod_infos, new_pod_infos)294 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)295def test__create_multiple_consecutive_updates(stateless_job, in_place):296 stateless_job.create()297 stateless_job.wait_for_state(goal_state="RUNNING")298 old_pod_infos = stateless_job.query_pods()299 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()300 update1 = StatelessUpdate(301 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC302 )303 update1.create(in_place=in_place)304 update2 = StatelessUpdate(305 stateless_job,306 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,307 batch_size=1,308 )309 update2.create(in_place=in_place)310 update2.wait_for_state(goal_state="SUCCEEDED")311 new_pod_infos = stateless_job.query_pods()312 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()313 assert len(old_pod_infos) == 3314 assert len(new_pod_infos) == 5315 assert_pod_id_changed(old_pod_infos, new_pod_infos)316 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)317def test__abort_update(stateless_job, in_place):318 stateless_job.create()319 stateless_job.wait_for_state(goal_state="RUNNING")320 update = StatelessUpdate(321 stateless_job,322 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,323 batch_size=1,324 )325 update.create(in_place=in_place)326 update.wait_for_state(goal_state="ROLLING_FORWARD")327 update.abort()328 update.wait_for_state(goal_state="ABORTED")329def test__update_reduce_instances(stateless_job, in_place):330 stateless_job.create()331 stateless_job.wait_for_state(goal_state="RUNNING")332 old_pod_infos = stateless_job.query_pods()333 assert len(old_pod_infos) == 3334 # first increase instances335 update = StatelessUpdate(336 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC337 )338 update.create()339 update.wait_for_state(goal_state="SUCCEEDED")340 new_pod_infos = stateless_job.query_pods()341 assert len(new_pod_infos) == 5342 # now reduce instances343 update = StatelessUpdate(344 stateless_job,345 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,346 )347 update.create(in_place=in_place)348 update.wait_for_state(goal_state="SUCCEEDED")349 new_pod_infos = stateless_job.query_pods()350 assert len(new_pod_infos) == 3351 # now increase back again352 update = StatelessUpdate(353 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC354 )355 update.create()356 update.wait_for_state(goal_state="SUCCEEDED")357 new_pod_infos = stateless_job.query_pods()358 assert len(new_pod_infos) == 5359def test__update_reduce_instances_stopped_tasks(stateless_job, in_place):360 stateless_job.create()361 stateless_job.wait_for_state(goal_state="RUNNING")362 old_pod_infos = stateless_job.query_pods()363 assert len(old_pod_infos) == 3364 # first increase instances365 update = StatelessUpdate(366 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC367 )368 update.create(in_place=in_place)369 update.wait_for_state(goal_state="SUCCEEDED")370 new_pod_infos = stateless_job.query_pods()371 assert len(new_pod_infos) == 5372 # now stop last 2 tasks373 ranges = task_pb2.InstanceRange(to=5)374 setattr(ranges, "from", 3)375 stateless_job.stop(ranges=[ranges])376 # now reduce instance count377 update = StatelessUpdate(378 stateless_job,379 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,380 )381 update.create(in_place=in_place)382 update.wait_for_state(goal_state="SUCCEEDED")383 new_pod_infos = stateless_job.query_pods()384 assert len(new_pod_infos) == 3385# test__create_update_bad_config tests creating an update with bad config386# without rollback387def test__create_update_with_bad_config(stateless_job, in_place):388 stateless_job.create()389 stateless_job.wait_for_state(goal_state="RUNNING")390 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()391 update = StatelessUpdate(392 stateless_job,393 updated_job_file=UPDATE_STATELESS_JOB_BAD_SPEC,394 max_failure_instances=3,395 max_instance_attempts=1,396 )397 update.create(in_place=in_place)398 update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")399 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()400 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)401 for pod_info in stateless_job.query_pods():402 assert pod_info.status.state == pod_pb2.POD_STATE_FAILED403# test__create_update_add_instances_with_bad_config404# tests creating an update with bad config and more instances405# without rollback406def test__create_update_add_instances_with_bad_config(stateless_job, in_place):407 stateless_job.create()408 stateless_job.wait_for_state(goal_state="RUNNING")409 job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)410 updated_job_spec = JobSpec()411 json_format.ParseDict(job_spec_dump, updated_job_spec)412 updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3413 update = StatelessUpdate(414 stateless_job,415 batch_size=1,416 updated_job_spec=updated_job_spec,417 max_failure_instances=1,418 max_instance_attempts=1,419 )420 update.create(in_place=in_place)421 update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")422 # only one instance should be added423 assert (424 len(stateless_job.query_pods())425 == stateless_job.job_spec.instance_count + 1426 )427# test__create_update_reduce_instances_with_bad_config428# tests creating an update with bad config and fewer instances429# without rollback430def test__create_update_reduce_instances_with_bad_config(stateless_job, in_place):431 stateless_job.create()432 stateless_job.wait_for_state(goal_state="RUNNING")433 old_pod_infos = stateless_job.query_pods()434 job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)435 updated_job_spec = JobSpec()436 json_format.ParseDict(job_spec_dump, updated_job_spec)437 updated_job_spec.instance_count = stateless_job.job_spec.instance_count - 1438 update = StatelessUpdate(439 stateless_job,440 updated_job_spec=updated_job_spec,441 batch_size=1,442 max_failure_instances=1,443 max_instance_attempts=1,444 )445 update.create(in_place=in_place)446 update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")447 new_pod_infos = stateless_job.query_pods()448 assert len(old_pod_infos) == len(new_pod_infos)449# test__create_update_with_failed_health_check450# tests an update fails even if the new task state is RUNNING,451# as long as the health check fails452def test__create_update_with_failed_health_check(stateless_job, in_place):453 stateless_job.create()454 stateless_job.wait_for_state(goal_state="RUNNING")455 update = StatelessUpdate(456 stateless_job,457 updated_job_file=UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC,458 max_failure_instances=1,459 max_instance_attempts=1,460 )461 update.create(in_place=in_place)462 update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")463# test__create_update_to_disable_health_check tests an update which464# disables healthCheck465def test__create_update_to_disable_health_check(in_place):466 job = StatelessJob(467 job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,468 config=IntegrationTestConfig(469 max_retry_attempts=100,470 pool_file='test_stateless_respool.yaml',471 ),472 )473 job.create()474 job.wait_for_state(goal_state="RUNNING")475 job.job_spec.default_spec.containers[0].liveness_check.enabled = False476 update = StatelessUpdate(477 job,478 updated_job_spec=job.job_spec,479 max_failure_instances=1,480 max_instance_attempts=1,481 )482 update.create(in_place=in_place)483 update.wait_for_state(goal_state="SUCCEEDED")484# test__create_update_to_enable_health_check tests an update which485# enables healthCheck486def test__create_update_to_enable_health_check(in_place):487 job = StatelessJob(488 job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,489 config=IntegrationTestConfig(490 max_retry_attempts=100,491 pool_file='test_stateless_respool.yaml',492 ),493 )494 job.job_spec.default_spec.containers[0].liveness_check.enabled = False495 job.create()496 job.wait_for_state(goal_state="RUNNING")497 job.job_spec.default_spec.containers[0].liveness_check.enabled = True498 update = StatelessUpdate(499 job,500 updated_job_spec=job.job_spec,501 max_failure_instances=1,502 max_instance_attempts=1,503 )504 update.create(in_place=in_place)505 update.wait_for_state(goal_state="SUCCEEDED")506# test__create_update_to_unset_health_check tests an update to unset507# health check config508def test__create_update_to_unset_health_check(in_place):509 job = StatelessJob(510 job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,511 config=IntegrationTestConfig(512 max_retry_attempts=100,513 pool_file='test_stateless_respool.yaml',514 ),515 )516 job.create()517 job.wait_for_state(goal_state="RUNNING")518 update = StatelessUpdate(519 job,520 updated_job_file=UPDATE_STATELESS_JOB_SPEC,521 max_failure_instances=1,522 max_instance_attempts=1,523 )524 update.create(in_place=in_place)525 update.wait_for_state(goal_state="SUCCEEDED")526# test__create_update_to_unset_health_check tests an update to set527# health check config for a job without health check set528def test__create_update_to_set_health_check(in_place):529 job = StatelessJob(530 job_file=UPDATE_STATELESS_JOB_SPEC,531 config=IntegrationTestConfig(532 max_retry_attempts=100,533 pool_file='test_stateless_respool.yaml',534 ),535 )536 job.create()537 job.wait_for_state(goal_state="RUNNING")538 update = StatelessUpdate(539 job,540 updated_job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,541 max_failure_instances=1,542 max_instance_attempts=1,543 )544 update.create(in_place=in_place)545 update.wait_for_state(goal_state="SUCCEEDED")546# test__create_update_to_change_health_check_config tests an update which547# changes healthCheck548def test__create_update_to_change_health_check_config(in_place):549 job = StatelessJob(550 job_file=UPDATE_STATELESS_JOB_WITH_HEALTH_CHECK_SPEC,551 config=IntegrationTestConfig(552 max_retry_attempts=100,553 pool_file='test_stateless_respool.yaml',554 ),555 )556 job.job_spec.default_spec.containers[0].liveness_check.enabled = False557 job.create()558 job.wait_for_state(goal_state="RUNNING")559 job.job_spec.default_spec.containers[560 0561 ].liveness_check.initial_interval_secs = 2562 update = StatelessUpdate(563 job,564 updated_job_spec=job.job_spec,565 max_failure_instances=1,566 max_instance_attempts=1,567 )568 update.create(in_place=in_place)569 update.wait_for_state(goal_state="SUCCEEDED")570# test__auto_rollback_update_with_bad_config tests creating an update with bad config571# with rollback572def test__auto_rollback_update_with_bad_config(stateless_job, in_place):573 stateless_job.create()574 stateless_job.wait_for_state(goal_state="RUNNING")575 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()576 update = StatelessUpdate(577 stateless_job,578 updated_job_file=UPDATE_STATELESS_JOB_BAD_SPEC,579 roll_back_on_failure=True,580 max_failure_instances=1,581 max_instance_attempts=1,582 )583 update.create(in_place=in_place)584 update.wait_for_state(goal_state="ROLLED_BACK")585 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()586 assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)587# test__auto_rollback_update_add_instances_with_bad_config588# tests creating an update with bad config and more instances589# with rollback590def test__auto_rollback_update_add_instances_with_bad_config(stateless_job, in_place):591 stateless_job.create()592 stateless_job.wait_for_state(goal_state="RUNNING")593 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()594 job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)595 updated_job_spec = JobSpec()596 json_format.ParseDict(job_spec_dump, updated_job_spec)597 updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3598 update = StatelessUpdate(599 stateless_job,600 updated_job_spec=updated_job_spec,601 roll_back_on_failure=True,602 max_failure_instances=1,603 max_instance_attempts=1,604 )605 update.create(in_place=in_place)606 update.wait_for_state(goal_state="ROLLED_BACK")607 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()608 # no instance should be added609 assert (610 len(stateless_job.query_pods())611 == stateless_job.job_spec.instance_count612 )613 assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)614# test__auto_rollback_update_reduce_instances_with_bad_config615# tests creating an update with bad config and fewer instances616# with rollback617def test__auto_rollback_update_reduce_instances_with_bad_config(stateless_job, in_place):618 stateless_job.create()619 stateless_job.wait_for_state(goal_state="RUNNING")620 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()621 job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)622 updated_job_spec = JobSpec()623 json_format.ParseDict(job_spec_dump, updated_job_spec)624 updated_job_spec.instance_count = stateless_job.job_spec.instance_count - 1625 update = StatelessUpdate(626 stateless_job,627 updated_job_spec=updated_job_spec,628 roll_back_on_failure=True,629 max_failure_instances=1,630 max_instance_attempts=1,631 )632 update.create(in_place=in_place)633 update.wait_for_state(goal_state="ROLLED_BACK")634 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()635 # no instance should be removed636 assert (637 len(stateless_job.query_pods())638 == stateless_job.job_spec.instance_count639 )640 assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)641# test__auto_rollback_update_with_failed_health_check642# tests an update fails even if the new task state is RUNNING,643# as long as the health check fails644def test__auto_rollback_update_with_failed_health_check(stateless_job, in_place):645 stateless_job.create()646 stateless_job.wait_for_state(goal_state="RUNNING")647 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()648 update = StatelessUpdate(649 stateless_job,650 updated_job_file=UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC,651 roll_back_on_failure=True,652 max_failure_instances=1,653 max_instance_attempts=1,654 )655 update.create(in_place=in_place)656 update.wait_for_state(goal_state="ROLLED_BACK")657 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()658 assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)659# test__pause_resume_initialized_update test pause and resume660# an update in initialized state661def test__pause_resume_initialized_update(stateless_job, in_place):662 stateless_job.create()663 stateless_job.wait_for_state(goal_state="RUNNING")664 old_pod_infos = stateless_job.query_pods()665 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()666 update = StatelessUpdate(667 stateless_job, batch_size=1, updated_job_file=UPDATE_STATELESS_JOB_SPEC668 )669 update.create(in_place=in_place)670 # immediately pause the update, so the update may still be INITIALIZED671 update.pause()672 update.wait_for_state(goal_state="PAUSED")673 update.resume()674 update.wait_for_state(goal_state="SUCCEEDED")675 new_pod_infos = stateless_job.query_pods()676 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()677 assert_pod_id_changed(old_pod_infos, new_pod_infos)678 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)679# test__pause_resume_initialized_update test pause and resume an update680def test__pause_resume__update(stateless_job, in_place):681 stateless_job.create()682 stateless_job.wait_for_state(goal_state="RUNNING")683 old_pod_infos = stateless_job.query_pods()684 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()685 update = StatelessUpdate(686 stateless_job, batch_size=1, updated_job_file=UPDATE_STATELESS_JOB_SPEC687 )688 update.create(in_place=in_place)689 # sleep for 1 sec so update can begin to roll forward690 time.sleep(1)691 update.pause()692 update.wait_for_state(goal_state="PAUSED")693 update.resume()694 update.wait_for_state(goal_state="SUCCEEDED")695 new_pod_infos = stateless_job.query_pods()696 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()697 assert_pod_id_changed(old_pod_infos, new_pod_infos)698 assert_pod_spec_changed(old_instance_zero_spec, new_instance_zero_spec)699# test_manual_rollback manually rolls back a running update when700# the instance count is reduced in the rollback.701# Note that manual rollback in peloton is just updating to the702# previous job configuration703def test_manual_rollback_reduce_instances(stateless_job, in_place):704 stateless_job.create()705 stateless_job.wait_for_state(goal_state="RUNNING")706 old_pod_infos = stateless_job.query_pods()707 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()708 update = StatelessUpdate(709 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC710 )711 update.create(in_place=in_place)712 # manually rollback the update713 update2 = StatelessUpdate(714 stateless_job,715 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,716 )717 update2.create(in_place=in_place)718 update2.wait_for_state(goal_state="SUCCEEDED")719 new_pod_infos = stateless_job.query_pods()720 assert len(old_pod_infos) == len(new_pod_infos)721 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()722 assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)723# test_manual_rollback manually rolls back a running update when724# the instance count is increased in the rollback725def test_manual_rollback_increase_instances(stateless_job, in_place):726 stateless_job.create()727 stateless_job.wait_for_state(goal_state="RUNNING")728 update = StatelessUpdate(729 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC730 )731 update.create(in_place=in_place)732 update.wait_for_state(goal_state="SUCCEEDED")733 old_pod_infos = stateless_job.query_pods()734 old_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()735 # reduce instance count and then roll it back736 update2 = StatelessUpdate(737 stateless_job,738 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,739 )740 update2.create(in_place=in_place)741 update3 = StatelessUpdate(742 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC743 )744 update3.create(in_place=in_place)745 update3.wait_for_state(goal_state="SUCCEEDED")746 new_pod_infos = stateless_job.query_pods()747 assert len(old_pod_infos) == len(new_pod_infos)748 new_instance_zero_spec = stateless_job.get_pod(0).get_pod_spec()749 assert_pod_spec_equal(old_instance_zero_spec, new_instance_zero_spec)750# test_auto_rollback_reduce_instances751# rolls back a failed update when752# the instance count is reduced in the rollback.753def test_auto_rollback_reduce_instances(stateless_job, in_place):754 stateless_job.create()755 stateless_job.wait_for_state(goal_state="RUNNING")756 job_spec_dump = load_test_config(757 UPDATE_STATELESS_JOB_BAD_HEALTH_CHECK_SPEC758 )759 updated_job_spec = JobSpec()760 json_format.ParseDict(job_spec_dump, updated_job_spec)761 # increase the instance count762 updated_job_spec.instance_count = stateless_job.job_spec.instance_count + 3763 update = StatelessUpdate(764 stateless_job,765 updated_job_spec=updated_job_spec,766 roll_back_on_failure=True,767 max_instance_attempts=1,768 max_failure_instances=1,769 batch_size=1,770 )771 update.create(in_place=in_place)772 update.wait_for_state(goal_state="ROLLED_BACK")773 assert (774 len(stateless_job.query_pods())775 == stateless_job.job_spec.instance_count776 )777# test_update_create_failure_invalid_spec tests the778# update create failure due to invalid spec in request779def test_update_create_failure_invalid_spec(stateless_job, in_place):780 stateless_job.create()781 stateless_job.wait_for_state(goal_state="RUNNING")782 update = StatelessUpdate(783 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_INVALID_SPEC784 )785 try:786 update.create(in_place=in_place)787 except grpc.RpcError as e:788 assert e.code() == grpc.StatusCode.INVALID_ARGUMENT789 return790 raise Exception("job spec validation error not received")791# test_update_killed_job tests updating a killed job.792# The job should be updated but still remain in killed state793def test_update_killed_job(in_place):794 job = StatelessJob(job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC)795 job.create()796 job.wait_for_state(goal_state="RUNNING")797 job.stop()798 job.wait_for_state(goal_state="KILLED")799 update = StatelessUpdate(800 job, updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC801 )802 update.create(in_place=in_place)803 update.wait_for_state(goal_state="SUCCEEDED")804 assert job.get_spec().instance_count == 3805 assert job.get_status().state == stateless_pb2.JOB_STATE_KILLED806# test_start_job_with_active_update tests807# starting a job with an active update808def test_start_job_with_active_update(stateless_job, in_place):809 stateless_job.create()810 stateless_job.wait_for_state(goal_state="RUNNING")811 assert len(stateless_job.query_pods()) == 3812 stateless_job.stop()813 update = StatelessUpdate(814 stateless_job,815 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,816 batch_size=1,817 )818 update.create(in_place=in_place)819 stateless_job.start()820 update.wait_for_state(goal_state="SUCCEEDED")821 stateless_job.wait_for_all_pods_running()822 assert len(stateless_job.query_pods()) == 5823# test_stop_running_job_with_active_update_add_instances tests824# stopping a running job with an active update(add instances)825def test_stop_running_job_with_active_update_add_instances(stateless_job, in_place):826 stateless_job.create()827 stateless_job.wait_for_state(goal_state="RUNNING")828 assert len(stateless_job.query_pods()) == 3829 update = StatelessUpdate(830 stateless_job,831 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_AND_ADD_INSTANCES_SPEC,832 batch_size=1,833 )834 update.create(in_place=in_place)835 update.wait_for_state(goal_state="ROLLING_FORWARD")836 stateless_job.stop()837 update.wait_for_state(goal_state="SUCCEEDED")838 assert stateless_job.get_spec().instance_count == 5839# test_stop_running_job_with_active_update_remove_instances tests840# stopping a running job with an active update(remove instances)841def test_stop_running_job_with_active_update_remove_instances(in_place):842 stateless_job = StatelessJob(843 job_file=UPDATE_STATELESS_JOB_ADD_INSTANCES_SPEC844 )845 stateless_job.create()846 stateless_job.wait_for_state(goal_state="RUNNING")847 assert len(stateless_job.query_pods()) == 5848 update = StatelessUpdate(849 stateless_job,850 updated_job_file=UPDATE_STATELESS_JOB_UPDATE_REDUCE_INSTANCES_SPEC,851 batch_size=1,852 )853 update.create(in_place=in_place)854 update.wait_for_state(goal_state="ROLLING_FORWARD")855 stateless_job.stop()856 update.wait_for_state(goal_state="SUCCEEDED")857 assert stateless_job.get_spec().instance_count == 3858# test_stop_running_job_with_active_update_same_instance_count tests stopping859# a running job with an active update that doesn't change instance count860def test_stop_running_job_with_active_update_same_instance_count(861 stateless_job,862 in_place863):864 stateless_job.create()865 stateless_job.wait_for_state(goal_state="RUNNING")866 stateless_job.job_spec.default_spec.containers[867 0868 ].command.value = "sleep 100"869 update = StatelessUpdate(870 stateless_job,871 updated_job_spec=stateless_job.job_spec,872 max_failure_instances=1,873 max_instance_attempts=1,874 )875 update.create(in_place=in_place)876 stateless_job.stop()877 update.wait_for_state(goal_state="SUCCEEDED")878 assert stateless_job.get_spec().instance_count == 3879 assert (880 stateless_job.get_spec().default_spec.containers[0].command.value881 == "sleep 100"882 )883# test__create_update_before_job_fully_created creates an update884# right after a job is created. It tests the case that job can be885# updated before it is fully created886def test__create_update_before_job_fully_created(stateless_job, in_place):887 stateless_job.create()888 update = StatelessUpdate(889 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_SPEC890 )891 update.create(in_place=in_place)892 update.wait_for_state(goal_state="SUCCEEDED")893 assert (894 stateless_job.get_spec().default_spec.containers[0].command.value895 == "while :; do echo updated; sleep 10; done"896 )897# test__in_place_update_success_rate tests that in-place update898# should succeed when every daemon is in healthy state.899# It starts a job with 30 instances, and start the in-place update900# without batch size, then it tests if any pod is running on unexpected901# host.902# TODO: Re-enable k8s when it stops being flaky.903# @pytest.mark.k8s904def test__in_place_update_success_rate(stateless_job):905 stateless_job.job_spec.instance_count = 30906 stateless_job.create()907 stateless_job.wait_for_all_pods_running()908 old_pod_infos = stateless_job.query_pods()909 job_spec_dump = load_test_config(update_stateless_job_spec())910 updated_job_spec = JobSpec()911 json_format.ParseDict(job_spec_dump, updated_job_spec)912 updated_job_spec.instance_count = 30913 if minicluster_type() == "k8s":914 updated_job_spec.default_spec.containers[0].resource.mem_limit_mb = 0.1915 update = StatelessUpdate(stateless_job,916 updated_job_spec=updated_job_spec,917 batch_size=0)918 update.create(in_place=True)919 update.wait_for_state(goal_state='SUCCEEDED')920 new_pod_infos = stateless_job.query_pods()921 old_pod_dict = {}922 new_pod_dict = {}923 for old_pod_info in old_pod_infos:924 split_index = old_pod_info.status.pod_id.value.rfind('-')925 pod_name = old_pod_info.status.pod_id.value[:split_index]926 old_pod_dict[pod_name] = old_pod_info.status.host927 for new_pod_info in new_pod_infos:928 split_index = new_pod_info.status.pod_id.value.rfind('-')929 pod_name = new_pod_info.status.pod_id.value[:split_index]930 new_pod_dict[pod_name] = new_pod_info.status.host931 count = 0932 for pod_name, pod_id in old_pod_dict.items():933 if new_pod_dict[pod_name] != old_pod_dict[pod_name]:934 log.info("%s, prev:%s, cur:%s", pod_name,935 old_pod_dict[pod_name], new_pod_dict[pod_name])936 count = count + 1937 log.info("total mismatch: %d", count)938 assert count == 0939# test__in_place_kill_job_release_host tests the case of killing940# an ongoing in-place update would release hosts, so the second941# update can get completed942def test__in_place_kill_job_release_host():943 job1 = StatelessJob(944 job_file="test_stateless_job_spec.yaml",945 )946 job1.create()947 job1.wait_for_state(goal_state="RUNNING")948 job2 = StatelessJob(949 job_file="test_stateless_job_spec.yaml",950 )951 job2.create()952 job2.wait_for_state(goal_state="RUNNING")953 update1 = StatelessUpdate(job1,954 updated_job_file=UPDATE_STATELESS_JOB_SPEC,955 batch_size=0)956 update1.create(in_place=True)957 # stop the update958 job1.stop()959 update2 = StatelessUpdate(job2,960 updated_job_file=UPDATE_STATELESS_JOB_SPEC,961 batch_size=0)962 update2.create()963 # both updates should complete964 update1.wait_for_state(goal_state="SUCCEEDED")965 update2.wait_for_state(goal_state="SUCCEEDED")966@pytest.mark.skip(reason="flaky test")967def test__in_place_update_host_maintenance(stateless_job, maintenance):968 # add enough instances so each host should have some tasks running969 stateless_job.job_spec.instance_count = 9970 # need extra retry attempts, since in-place update would need more time971 # to process given agent is put in maintenance mode972 stateless_job.config = IntegrationTestConfig(973 max_retry_attempts=300,974 pool_file='test_stateless_respool.yaml',975 ),976 stateless_job.create()977 stateless_job.wait_for_all_pods_running()978 job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_SPEC)979 updated_job_spec = JobSpec()980 json_format.ParseDict(job_spec_dump, updated_job_spec)981 updated_job_spec.instance_count = 9982 update = StatelessUpdate(stateless_job,983 updated_job_spec=updated_job_spec,984 batch_size=0)985 update.create(in_place=True)986 # Pick a host that is UP and start maintenance on it987 test_host = get_host_in_state(host_pb2.HOST_STATE_UP)988 resp = maintenance["start"]([test_host])989 assert resp990 wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)991 update.wait_for_state(goal_state="SUCCEEDED")992def test__update_with_sla_aware_host_maintenance(stateless_job, maintenance):993 """994 1. Create a stateless job with 3 instances.995 2. Create a job update to update the instance job with instance count 2,996 add host-limit-1 constraint and define sla with maximum_unavailable_instances=1997 3. Start host maintenance on one of the hosts998 4. The host should transition to DOWN and the update workflow should SUCCEED999 """1000 stateless_job.create()1001 stateless_job.wait_for_all_pods_running()1002 job_spec_dump = load_test_config('test_stateless_job_spec_sla.yaml')1003 updated_job_spec = JobSpec()1004 json_format.ParseDict(job_spec_dump, updated_job_spec)1005 updated_job_spec.instance_count = 21006 update = StatelessUpdate(stateless_job,1007 updated_job_spec=updated_job_spec,1008 batch_size=1)1009 update.create()1010 # Pick a host that is UP and start maintenance on it1011 test_host = get_host_in_state(host_pb2.HOST_STATE_UP)1012 resp = maintenance["start"]([test_host])1013 assert resp1014 update.wait_for_state(goal_state="SUCCEEDED")1015 wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)1016def test__update_with_host_maintenance_and_agent_down(stateless_job, maintenance):1017 """1018 1. Create a large stateless job (that take up more than two-thirds of1019 the cluster resources) with MaximumUnavailableInstances=2.1020 2. Start host maintenance on one of the hosts (say A) having pods of the job.1021 MaximumUnavailableInstances=2 ensures that not more than 2 pods are1022 unavailable due to host maintenance at a time.1023 3. Take down another host which has pods running on it. This will TASK_LOST1024 to be sent for all pods on the host after 75 seconds.1025 4. Start an update to modify the instance spec of one of the pods.1026 5. Since TASK_LOST would cause the job SLA to be violated, instances on the1027 host A should not be killed once LOST event is received. Verify that1028 host A does not transition to DOWN.1029 """1030 stateless_job.job_spec.instance_count = 301031 stateless_job.job_spec.default_spec.containers[0].resource.cpu_limit = 0.31032 stateless_job.job_spec.sla.maximum_unavailable_instances = 21033 stateless_job.create()1034 stateless_job.wait_for_all_pods_running()1035 hosts = [h.hostname for h in query_hosts([]).host_infos]1036 host_to_task_count = get_host_to_task_count(hosts, stateless_job)1037 sorted_hosts = [t[0] for t in sorted(1038 host_to_task_count.items(), key=operator.itemgetter(1), reverse=True)]1039 # Pick a host that has pods running on it to start maintenance on it.1040 test_host = sorted_hosts[0]1041 # pick another host which has pods of the job to take down1042 host_container = get_container([sorted_hosts[1]])1043 try:1044 host_container.stop()1045 maintenance["start"]([test_host])1046 stateless_job.job_spec.instance_spec[10].containers.extend(1047 [pod_pb2.ContainerSpec(resource=pod_pb2.ResourceSpec(disk_limit_mb=20))])1048 update = StatelessUpdate(stateless_job,1049 updated_job_spec=stateless_job.job_spec,1050 batch_size=0)1051 update.create()1052 update.wait_for_state(goal_state="SUCCEEDED")1053 stateless_job.stop()1054 wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)1055 assert False, 'Host should not transition to DOWN'1056 except:1057 assert is_host_in_state(test_host, host_pb2.HOST_STATE_DRAINING)1058 pass1059 finally:1060 host_container.start()1061def test__update_with_host_maintenance__bad_config(stateless_job, maintenance):1062 """1063 1. Create a stateless job with 6 instances. Wait for all instances to reach1064 RUNNING state. This means that there is at least one host with 2 or more1065 instances on it1066 2. Start a bad job update with max failure tolerance of 1 and auto-rollback1067 disabled.1068 3. Start host maintenance on one of the hosts (say host A).1069 4. Wait for the update to fail. There should be 2 instances unavailable.1070 5. Since 2 instances are already unavailable and1071 maximum_unavailable_instances=1, host maintenance should not proceed.1072 Verify that the host A doesn't transition to DOWN.1073 """1074 stateless_job.job_spec.sla.maximum_unavailable_instances = 11075 stateless_job.job_spec.instance_count = 61076 stateless_job.create()1077 stateless_job.wait_for_all_pods_running()1078 hosts = [h.hostname for h in query_hosts([]).host_infos]1079 host_to_task_count = get_host_to_task_count(hosts, stateless_job)1080 sorted_hosts = [t[0] for t in sorted(1081 host_to_task_count.items(), key=operator.itemgetter(1), reverse=True)]1082 job_spec_dump = load_test_config(UPDATE_STATELESS_JOB_BAD_SPEC)1083 updated_job_spec = JobSpec()1084 json_format.ParseDict(job_spec_dump, updated_job_spec)1085 updated_job_spec.instance_count = 61086 updated_job_spec.sla.maximum_unavailable_instances = 11087 update = StatelessUpdate(1088 stateless_job,1089 updated_job_spec=updated_job_spec,1090 max_failure_instances=1,1091 max_instance_attempts=1,1092 batch_size=2,1093 )1094 update.create()1095 # Pick a host that has pods running on it to start maintenance on it.1096 test_host = sorted_hosts[0]1097 maintenance["start"]([test_host])1098 update.wait_for_state(goal_state="FAILED", failed_state="SUCCEEDED")1099 try:1100 wait_for_host_state(test_host, host_pb2.HOST_STATE_DOWN)1101 assert False, 'Host should not transition to DOWN'1102 except:1103 assert is_host_in_state(test_host, host_pb2.HOST_STATE_DRAINING)1104# test__create_update_update_job_config tests update job level config1105# would not trigger task restart1106def test__create_update_update_job_config(stateless_job):1107 stateless_job.create()1108 stateless_job.wait_for_all_pods_running()1109 old_pod_infos = stateless_job.query_pods()1110 update = StatelessUpdate(1111 stateless_job, updated_job_file=UPDATE_STATELESS_JOB_JOB_CONFIG_UPDATE_SPEC1112 )1113 update.create()1114 update.wait_for_state(goal_state="SUCCEEDED")1115 new_pod_infos = stateless_job.query_pods()...
test_provisioner_service.py
Source:test_provisioner_service.py
...137 launch_id = _new_id()138 node_ids = [_new_id()]139 client.provision(launch_id, node_ids, deployable_type,140 'fake-site1', caller="asterix")141 ok = notifier.wait_for_state(InstanceState.FAILED, node_ids)142 self.assertTrue(ok)143 self.assertTrue(notifier.assure_record_count(1))144 self.assertStoreNodeRecords(InstanceState.FAILED, *node_ids)145 self.assertStoreLaunchRecord(InstanceState.FAILED, launch_id)146 def test_provision_with_vars(self):147 client = self.client148 caller = 'asterix'149 deployable_type = 'empty-with-vars'150 launch_id = _new_id()151 node_ids = [_new_id()]152 vars = {'image_id': 'fake-image'}153 client.provision(launch_id, node_ids, deployable_type,154 'fake-site1', vars=vars, caller=caller)155 self.notifier.wait_for_state(InstanceState.PENDING, node_ids,156 before=self.provisioner.leader._force_cycle)157 self.assertStoreNodeRecords(InstanceState.PENDING, *node_ids)158 def test_provision_with_missing_vars(self):159 client = self.client160 notifier = self.notifier161 caller = 'asterix'162 deployable_type = 'empty-with-vars'163 launch_id = _new_id()164 node_ids = [_new_id()]165 vars = {'foo': 'bar'}166 client.provision(launch_id, node_ids, deployable_type,167 'fake-site1', vars=vars, caller=caller)168 ok = notifier.wait_for_state(InstanceState.FAILED, node_ids)169 self.assertTrue(ok)170 self.assertTrue(notifier.assure_record_count(1))171 self.assertStoreNodeRecords(InstanceState.FAILED, *node_ids)172 self.assertStoreLaunchRecord(InstanceState.FAILED, launch_id)173 def test_provision_broker_error(self):174 client = self.client175 notifier = self.notifier176 deployable_type = 'empty'177 launch_id = _new_id()178 self.context_client.create_error = BrokerError("fake failure")179 node_ids = [_new_id()]180 client.provision(launch_id, node_ids, deployable_type,181 'fake-site1', caller="asterix")182 ok = notifier.wait_for_state(InstanceState.FAILED, node_ids)183 self.assertTrue(ok)184 self.assertTrue(notifier.assure_record_count(1))185 self.assertStoreNodeRecords(InstanceState.FAILED, *node_ids)186 self.assertStoreLaunchRecord(InstanceState.FAILED, launch_id)187 def test_dump_state(self):188 running_launch, running_nodes = make_launch_and_nodes(_new_id(), 10, InstanceState.RUNNING)189 self.store.add_launch(running_launch)190 for node in running_nodes:191 self.store.add_node(node)192 pending_launch, pending_nodes = make_launch_and_nodes(_new_id(), 3, InstanceState.PENDING)193 self.store.add_launch(pending_launch)194 for node in pending_nodes:195 self.store.add_node(node)196 running_node_ids = [node['node_id'] for node in running_nodes]197 pending_node_ids = [node['node_id'] for node in pending_nodes]198 all_node_ids = running_node_ids + pending_node_ids199 self.client.dump_state(running_node_ids)200 ok = self.notifier.wait_for_state(InstanceState.RUNNING, nodes=running_node_ids)201 self.assertTrue(ok)202 self.assertEqual(len(self.notifier.nodes), len(running_nodes))203 self.client.dump_state(pending_node_ids)204 ok = self.notifier.wait_for_state(InstanceState.PENDING, nodes=pending_node_ids)205 self.assertTrue(ok)206 self.assertEqual(len(self.notifier.nodes), len(all_node_ids))207 # we should have not gotten any dupe records yet208 self.assertTrue(self.notifier.assure_record_count(1))209 # empty dump request should dump nothing210 self.client.dump_state([])211 self.assertTrue(self.notifier.assure_record_count(1))212 def test_terminate(self):213 node_ids = []214 for _ in range(10):215 node_id = _new_id()216 node_ids.append(node_id)217 self.client.provision(_new_id(), [node_id], "empty",218 site="fake-site1", caller="asterix")219 self.notifier.wait_for_state(InstanceState.PENDING, node_ids,220 before=self.provisioner.leader._force_cycle)221 for node_id in node_ids:222 node = self.store.get_node(node_id)223 self.driver.set_node_running(node['iaas_id'])224 self.notifier.wait_for_state(InstanceState.STARTED, node_ids,225 before=self.provisioner.leader._force_cycle)226 # terminate half of the nodes then the rest227 first_five = node_ids[:5]228 last_five = node_ids[5:]229 self.client.terminate_nodes(first_five, caller="asterix")230 ok = self.notifier.wait_for_state(InstanceState.TERMINATED, nodes=first_five)231 self.assertTrue(ok)232 self.client.terminate_nodes(last_five, caller="asterix")233 ok = self.notifier.wait_for_state(InstanceState.TERMINATED, nodes=last_five)234 self.assertTrue(ok)235 self.assertEqual(set(node_ids), set(self.notifier.nodes))236 # should be REQUESTED, PENDING, STARTED, TERMINATING and TERMINATED records for each node237 self.assertTrue(self.notifier.assure_record_count(5))238 self.assertEqual(len(self.driver.destroyed),239 len(node_ids))240 def test_terminate_unknown(self):241 instance_id = _new_id()242 self.client.terminate_nodes([instance_id])243 ok = self.notifier.wait_for_state(InstanceState.TERMINATED, nodes=[instance_id])244 self.assertTrue(ok)245 def test_launch_allocation(self):246 node_id = _new_id()247 self.client.provision(_new_id(), [node_id], "empty",248 site="fake-site1", caller="asterix")249 self.notifier.wait_for_state(InstanceState.PENDING, [node_id],250 before=self.provisioner.leader._force_cycle)251 self.assertStoreNodeRecords(InstanceState.PENDING)252 self.assertEqual(len(self.driver.created), 1)253 libcloud_node = self.driver.created[0]254 self.assertEqual(libcloud_node.size.id, "m1.small")255 def test_launch_many_terminate_all(self):256 all_node_ids = []257 # after the terminate_all, provision requests should be REJECTED258 rejected_node_ids = []259 for _ in range(100):260 node_id = _new_id()261 all_node_ids.append(node_id)262 self.client.provision(_new_id(), [node_id], "empty",263 site="fake-site1", caller="asterix")264 self.notifier.wait_for_state(InstanceState.PENDING, all_node_ids,265 before=self.provisioner.leader._force_cycle)266 self.assertStoreNodeRecords(InstanceState.PENDING, *all_node_ids)267 for node_id in all_node_ids:268 node = self.store.get_node(node_id)269 self.driver.set_node_running(node['iaas_id'])270 self.notifier.wait_for_state(InstanceState.STARTED, all_node_ids,271 before=self.provisioner.leader._force_cycle)272 self.assertStoreNodeRecords(InstanceState.STARTED, *all_node_ids)273 log.debug("Expecting %d nodes to be terminated", len(all_node_ids))274 self.assertIs(self.client.terminate_all(), False)275 # future requests should be rejected276 for _ in range(5):277 node_id = _new_id()278 rejected_node_ids.append(node_id)279 self.client.provision(_new_id(), [node_id], "empty",280 site="fake-site1", caller="asterix")281 self.notifier.wait_for_state(InstanceState.TERMINATED, all_node_ids,282 before=self.provisioner.leader._force_cycle, timeout=240)283 self.assertStoreNodeRecords(InstanceState.TERMINATED, *all_node_ids)284 self.notifier.wait_for_state(InstanceState.REJECTED, rejected_node_ids, timeout=240)285 self.assertStoreNodeRecords(InstanceState.REJECTED, *rejected_node_ids)286 self.assertEqual(len(self.driver.destroyed),287 len(all_node_ids))288 self.assertIs(self.client.terminate_all(), True)289 # now re-enable290 self.client.enable()291 node_id = _new_id()292 log.debug("Launching node %s which should be accepted", node_id)293 self.client.provision(_new_id(), [node_id], "empty",294 site="fake-site1", caller="asterix")295 self.notifier.wait_for_state(InstanceState.PENDING, [node_id],296 before=self.provisioner.leader._force_cycle, timeout=60)297 self.assertStoreNodeRecords(InstanceState.PENDING, node_id)298 def test_describe(self):299 node_ids = []300 for _ in range(3):301 launch_id = _new_id()302 running_launch, running_nodes = make_launch_and_nodes(launch_id, 1,303 InstanceState.RUNNING,304 site="fake-site1", caller=self.default_user)305 self.store.add_launch(running_launch)306 for node in running_nodes:307 self.store.add_node(node)308 node_ids.append(running_nodes[0]['node_id'])309 log.debug("requestin")310 all_nodes = self.client.describe_nodes()311 self.assertEqual(len(all_nodes), len(node_ids))312 one_node = self.client.describe_nodes([node_ids[0]])313 self.assertEqual(len(one_node), 1)314 self.assertEqual(one_node[0]['node_id'], node_ids[0])315 def test_multiuser(self):316 """Test that nodes started by one user can't be modified by317 another user318 """319 permitted_user = "asterix"320 disallowed_user = "cacaphonix"321 client = self.client322 deployable_type = 'empty'323 launch_id = _new_id()324 node_ids = [_new_id()]325 vars = {'image_id': 'fake-image'}326 client.provision(launch_id, node_ids, deployable_type,327 'fake-site1', vars=vars, caller=permitted_user)328 self.notifier.wait_for_state(InstanceState.PENDING, node_ids,329 before=self.provisioner.leader._force_cycle)330 self.assertStoreNodeRecords(InstanceState.PENDING, *node_ids)331 # Test describe332 permitted_nodes = client.describe_nodes(caller=permitted_user)333 self.assertEqual(len(permitted_nodes), len(node_ids))334 disallowed_nodes = client.describe_nodes(caller=disallowed_user)335 self.assertEqual(len(disallowed_nodes), 0)336 # Test terminate337 client.terminate_nodes(node_ids, caller=disallowed_user)338 terminate_timed_out = False339 try:340 self.notifier.wait_for_state(InstanceState.TERMINATED, node_ids,341 before=self.provisioner.leader._force_cycle, timeout=2)342 except Exception:343 terminate_timed_out = True344 self.assertTrue(terminate_timed_out,345 msg="Terminate worked with non-matching user")346 client.terminate_nodes(node_ids, caller=permitted_user)347 self.notifier.wait_for_state(InstanceState.TERMINATED, node_ids,348 before=self.provisioner.leader._force_cycle, timeout=2)349 self.assertStoreNodeRecords(InstanceState.TERMINATED, *node_ids)350 def test_record_reaper(self):351 launch_id1 = _new_id()352 launch_id2 = _new_id()353 now = time.time()354 node1 = make_node(launch_id1, InstanceState.TERMINATED, caller=self.default_user,355 state_changes=[(InstanceState.TERMINATED, now - self.record_reaping_max_age - 1)])356 node2 = make_node(launch_id1, InstanceState.FAILED, caller=self.default_user,357 state_changes=[(InstanceState.FAILED, now - self.record_reaping_max_age - 1)])358 node3 = make_node(launch_id1, InstanceState.REJECTED, caller=self.default_user,359 state_changes=[(InstanceState.REJECTED, now - self.record_reaping_max_age - 1)])360 nodes1 = [node1, node2, node3]361 launch1 = make_launch(launch_id1, InstanceState.RUNNING, nodes1, caller=self.default_user)362 node4 = make_node(launch_id2, InstanceState.RUNNING, caller=self.default_user,363 state_changes=[(InstanceState.RUNNING, now - self.record_reaping_max_age - 1)])364 node5 = make_node(launch_id2, InstanceState.TERMINATED, caller=self.default_user,365 state_changes=[(InstanceState.TERMINATED, now - self.record_reaping_max_age - 1)])366 nodes2 = [node4, node5]367 launch2 = make_launch(launch_id2, InstanceState.RUNNING, nodes2, caller=self.default_user)368 self.store.add_launch(launch1)369 for node in nodes1:370 self.store.add_node(node)371 self.store.add_launch(launch2)372 for node in nodes2:373 self.store.add_node(node)374 # Wait a second for record to get written375 time.sleep(1)376 # Force a record reaping cycle377 self.provisioner.leader._force_record_reaping()378 # Check that the first launch is completely removed379 node_ids1 = map(lambda x: x['node_id'], nodes1)380 self.assertNoStoreNodeRecords(*node_ids1)381 self.assertNoStoreLaunchRecord(launch_id1)382 # Check that the second launch is still here but with only the running node383 self.assertStoreNodeRecords(InstanceState.RUNNING, node4['node_id'])384 self.assertStoreLaunchRecord(InstanceState.RUNNING, launch_id2)385class ProvisionerServiceNoContextualizationTest(BaseProvisionerServiceTests):386 def setUp(self):387 self.notifier = FakeProvisionerNotifier()388 self.context_client = None389 self.store = self.setup_store()390 self.driver = FakeNodeDriver()391 self.driver.initialize()392 self.spawn_procs()393 self.load_dtrs()394 def test_launch_no_context(self):395 all_node_ids = []396 for _ in range(10):397 node_id = _new_id()398 all_node_ids.append(node_id)399 self.client.provision(_new_id(), [node_id], "empty",400 site="fake-site1", caller="asterix")401 self.notifier.wait_for_state(InstanceState.PENDING, all_node_ids,402 before=self.provisioner.leader._force_cycle)403 self.assertStoreNodeRecords(InstanceState.PENDING, *all_node_ids)404 for node_id in all_node_ids:405 node = self.store.get_node(node_id)406 self.driver.set_node_running(node['iaas_id'])407 self.notifier.wait_for_state(InstanceState.RUNNING, all_node_ids,408 before=self.provisioner.leader._force_cycle)409 self.assertStoreNodeRecords(InstanceState.RUNNING, *all_node_ids)410class ProvisionerZooKeeperServiceTest(ProvisionerServiceTest, ZooKeeperTestMixin):411 # this runs all of the ProvisionerServiceTest tests wih a ZK store412 def setup_store(self):413 self.setup_zookeeper(base_path_prefix="/provisioner_service_tests_")414 store = ProvisionerZooKeeperStore(self.zk_hosts, self.zk_base_path, use_gevent=self.use_gevent)415 store.initialize()416 return store417 def teardown_store(self):...
test_v2v_migrations_single_vcenter.py
Source:test_v2v_migrations_single_vcenter.py
...73 infra_map=mapping_data_vm_obj_mini.infra_mapping_data.get("name"),74 vm_list=mapping_data_vm_obj_mini.vm_list,75 target_provider=provider76 )77 assert migration_plan.wait_for_state("Started")78 assert migration_plan.wait_for_state("In_Progress")79 assert migration_plan.wait_for_state("Completed")80 assert migration_plan.wait_for_state("Successful")81 # check power state on migrated VM82 migrated_vm = get_migrated_vm(src_vm, provider)83 assert power_state in migrated_vm.mgmt.state84 # check tags85 collection = provider.appliance.provider_based_collection(provider)86 vm_obj = collection.instantiate(migrated_vm.name, provider)87 owner_tag = None88 for t in vm_obj.get_tags():89 if tag.display_name in t.display_name:90 owner_tag = t91 assert owner_tag is not None and tag.display_name in owner_tag.display_name92 # If Never is not there, that means retirement is set.93 assert 'Never' not in vm_obj.retirement_date94@pytest.mark.parametrize('source_type, dest_type, template_type',95 [96 ['nfs', 'nfs', [Templates.RHEL7_MINIMAL,97 Templates.RHEL7_MINIMAL,98 Templates.RHEL7_MINIMAL,99 Templates.RHEL7_MINIMAL]100 ]101 ])102def test_multi_host_multi_vm_migration(request, appliance, provider,103 source_type, dest_type, template_type,104 mapping_data_multiple_vm_obj_single_datastore):105 """106 Polarion:107 assignee: sshveta108 caseimportance: medium109 caseposneg: positive110 testtype: functional111 startsin: 5.10112 casecomponent: V2V113 initialEstimate: 1h114 """115 infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings116 mapping_data = mapping_data_multiple_vm_obj_single_datastore.infra_mapping_data117 mapping = infrastructure_mapping_collection.create(**mapping_data)118 migration_plan_collection = appliance.collections.v2v_migration_plans119 migration_plan = migration_plan_collection.create(120 name=fauxfactory.gen_alphanumeric(start="plan_"),121 description=fauxfactory.gen_alphanumeric(15, start="plan_desc_"),122 infra_map=mapping.name,123 vm_list=mapping_data_multiple_vm_obj_single_datastore.vm_list,124 target_provider=provider125 )126 assert migration_plan.wait_for_state("Started")127 request_details_list = migration_plan.get_plan_vm_list(wait_for_migration=False)128 vms = request_details_list.read()129 # testing multi-host utilization130 match = ['Converting', 'Migrating']131 def _is_migration_started(vm):132 if any(string in request_details_list.get_message_text(vm) for string in match):133 return True134 return False135 for vm in vms:136 wait_for(func=_is_migration_started, func_args=[vm],137 message="migration has not started for all VMs", delay=5, num_sec=300)138 if provider.one_of(OpenStackProvider):139 host_creds = provider.appliance.collections.openstack_nodes.all()140 else:141 host_creds = provider.hosts.all()142 hosts_dict = {key.name: [] for key in host_creds}143 for vm in vms:144 popup_text = request_details_list.read_additional_info_popup(vm)145 # open__additional_info_popup function also closes opened popup in our case146 request_details_list.open_additional_info_popup(vm)147 if popup_text['Conversion Host'] in hosts_dict:148 hosts_dict[popup_text['Conversion Host']].append(vm)149 for host in hosts_dict:150 if len(hosts_dict[host]) > 0:151 logger.info("Host: {} is migrating VMs: {}".format(host, hosts_dict[host]))152 assert migration_plan.wait_for_state("In_Progress")153 assert migration_plan.wait_for_state("Completed")154 assert migration_plan.wait_for_state("Successful")155def test_migration_special_char_name(appliance, provider, request,156 mapping_data_vm_obj_mini):157 """Tests migration where name of migration plan is comprised of special non-alphanumeric158 characters, such as '@#$(&#@('.159 Polarion:160 assignee: sshveta161 caseimportance: medium162 caseposneg: positive163 testtype: functional164 startsin: 5.10165 casecomponent: V2V166 initialEstimate: 1h167 """168 migration_plan_collection = appliance.collections.v2v_migration_plans169 migration_plan = migration_plan_collection.create(170 name=fauxfactory.gen_alphanumeric(start="plan_"),171 description=fauxfactory.gen_alphanumeric(start="plan_desc_"),172 infra_map=mapping_data_vm_obj_mini.infra_mapping_data.get("name"),173 vm_list=mapping_data_vm_obj_mini.vm_list,174 target_provider=provider175 )176 assert migration_plan.wait_for_state("Started")177 assert migration_plan.wait_for_state("In_Progress")178 assert migration_plan.wait_for_state("Completed")179 assert migration_plan.wait_for_state("Successful")180 # validate MAC address matches between source and target VMs181 src_vm = mapping_data_vm_obj_mini.vm_list[0]182 migrated_vm = get_migrated_vm(src_vm, provider)183 @request.addfinalizer184 def _cleanup():185 cleanup_target(provider, migrated_vm)186 assert src_vm.mac_address == migrated_vm.mac_address187def test_migration_long_name(request, appliance, provider, source_provider):188 """Test to check VM name with 64 character should work189 Polarion:190 assignee: sshveta191 initialEstimate: 1/2h192 casecomponent: V2V193 """194 source_datastores_list = source_provider.data.get("datastores", [])195 source_datastore = [d.name for d in source_datastores_list if d.type == "nfs"][0]196 collection = appliance.provider_based_collection(source_provider)197 # Following code will create vm name with 64 characters198 vm_name = "{vm_name}{extra_words}".format(vm_name=random_vm_name(context="v2v"),199 extra_words=fauxfactory.gen_alpha(51))200 template = _get_template(source_provider, Templates.RHEL7_MINIMAL)201 vm_obj = collection.instantiate(202 name=vm_name,203 provider=source_provider,204 template_name=template.name,205 )206 vm_obj.create_on_provider(207 timeout=2400,208 find_in_cfme=True,209 allow_skip="default",210 datastore=source_datastore)211 request.addfinalizer(lambda: vm_obj.cleanup_on_provider())212 mapping_data = infra_mapping_default_data(source_provider, provider)213 infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings214 mapping = infrastructure_mapping_collection.create(**mapping_data)215 @request.addfinalizer216 def _cleanup():217 infrastructure_mapping_collection.delete(mapping)218 migration_plan_collection = appliance.collections.v2v_migration_plans219 migration_plan = migration_plan_collection.create(220 name=fauxfactory.gen_alphanumeric(20, start="long_name_"),221 description=fauxfactory.gen_alphanumeric(25, start="desc_long_name_"),222 infra_map=mapping.name,223 vm_list=[vm_obj],224 target_provider=provider225 )226 assert migration_plan.wait_for_state("Started")227 assert migration_plan.wait_for_state("In_Progress")228 assert migration_plan.wait_for_state("Completed")229 assert migration_plan.wait_for_state("Successful")230 migrated_vm = get_migrated_vm(vm_obj, provider)231 assert vm_obj.mac_address == migrated_vm.mac_address232@pytest.mark.parametrize('source_type, dest_type, template_type',233 [['nfs', 'nfs', Templates.RHEL7_MINIMAL]])234def test_migration_with_edited_mapping(request, appliance, source_provider, provider,235 source_type, dest_type, template_type,236 mapping_data_vm_obj_single_datastore):237 """238 Test migration with edited infrastructure mapping.239 Polarion:240 assignee: sshveta241 caseimportance: medium242 caseposneg: positive243 testtype: functional244 startsin: 5.10245 casecomponent: V2V246 initialEstimate: 1h247 """248 infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings249 mapping_data = infra_mapping_default_data(source_provider, provider)250 mapping = infrastructure_mapping_collection.create(**mapping_data)251 mapping.update(mapping_data_vm_obj_single_datastore.infra_mapping_data)252 # vm_obj is a list, with only 1 VM object, hence [0]253 src_vm_obj = mapping_data_vm_obj_single_datastore.vm_list[0]254 migration_plan_collection = appliance.collections.v2v_migration_plans255 migration_plan = migration_plan_collection.create(256 name=fauxfactory.gen_alphanumeric(start="plan_"),257 description=fauxfactory.gen_alphanumeric(15, start="plan_desc_"),258 infra_map=mapping.name,259 vm_list=mapping_data_vm_obj_single_datastore.vm_list,260 target_provider=provider)261 assert migration_plan.wait_for_state("Started")262 assert migration_plan.wait_for_state("In_Progress")263 assert migration_plan.wait_for_state("Completed")264 assert migration_plan.wait_for_state("Successful")265 migrated_vm = get_migrated_vm(src_vm_obj, provider)266 @request.addfinalizer267 def _cleanup():268 infrastructure_mapping_collection.delete(mapping)269 cleanup_target(provider, migrated_vm)270 assert src_vm_obj.mac_address == migrated_vm.mac_address271@pytest.mark.tier(3)272@pytest.mark.parametrize(273 "source_type, dest_type, template_type",274 [["nfs", "nfs", Templates.UBUNTU16_TEMPLATE]])275def test_migration_restart(request, appliance, provider,276 source_type, dest_type, template_type,277 mapping_data_vm_obj_single_datastore):278 """279 Test migration by restarting evmserverd in middle of the process280 Polarion:281 assignee: sshveta282 initialEstimate: 1h283 caseimportance: medium284 caseposneg: positive285 testtype: functional286 startsin: 5.10287 casecomponent: V2V288 """289 infrastructure_mapping_collection = appliance.collections.v2v_infra_mappings290 mapping_data = mapping_data_vm_obj_single_datastore.infra_mapping_data291 mapping = infrastructure_mapping_collection.create(**mapping_data)292 src_vm_obj = mapping_data_vm_obj_single_datastore.vm_list[0]293 migration_plan_collection = appliance.collections.v2v_migration_plans294 migration_plan = migration_plan_collection.create(295 name=fauxfactory.gen_alphanumeric(start="plan_"),296 description=fauxfactory.gen_alphanumeric(15, start="plan_desc_"),297 infra_map=mapping.name,298 target_provider=provider,299 vm_list=mapping_data_vm_obj_single_datastore.vm_list,300 )301 assert migration_plan.wait_for_state("Started")302 # reboot system when the actual disk migration elapsed a 240 second time duration303 migration_plan.in_progress(plan_elapsed_time=240)304 appliance.restart_evm_rude()305 appliance.wait_for_miq_ready()306 try:307 assert migration_plan.wait_for_state("In_Progress")308 except WebDriverException:309 pass310 assert migration_plan.wait_for_state("Completed")311 assert migration_plan.wait_for_state("Successful")312 migrated_vm = get_migrated_vm(src_vm_obj, provider)313 @request.addfinalizer314 def _cleanup():315 infrastructure_mapping_collection.delete(mapping)316 cleanup_target(provider, migrated_vm)317 assert src_vm_obj.mac_address == migrated_vm.mac_address318@pytest.mark.tier(2)319def test_if_no_password_is_exposed_in_logs_during_migration(appliance, source_provider, provider,320 request, mapping_data_vm_obj_mini):321 """322 title: OSP: Test if no password is exposed in logs during migration323 Polarion:324 assignee: mnadeem325 casecomponent: V2V326 initialEstimate: 1/8h327 startsin: 5.10328 subcomponent: OSP329 testSteps:330 1. Create infrastructure mapping for Vmware to OSP/RHV331 2. Create migration plan332 3. Start migration333 expectedResults:334 1. Mapping created and visible in UI335 2.336 3. logs should not show password during migration337 """338 cred = []339 ssh_key_name = source_provider.data['private-keys']['vmware-ssh-key']['credentials']340 cred.append(credentials[source_provider.data.get("credentials")]["password"])341 cred.append(credentials[ssh_key_name]["password"])342 cred.append(credentials[provider.data.get("credentials")]["password"])343 if provider.one_of(OpenStackProvider):344 osp_key_name = provider.data['private-keys']['conversion_host_ssh_key']['credentials']345 cred.append(credentials[osp_key_name]["password"])346 automation_log = LogValidator("/var/www/miq/vmdb/log/automation.log", failure_patterns=cred,347 hostname=appliance.hostname)348 evm_log = LogValidator("/var/www/miq/vmdb/log/evm.log", failure_patterns=cred,349 hostname=appliance.hostname)350 automation_log.start_monitoring()351 evm_log.start_monitoring()352 migration_plan_collection = appliance.collections.v2v_migration_plans353 migration_plan = migration_plan_collection.create(354 name=fauxfactory.gen_alphanumeric(start="plan_"),355 description=fauxfactory.gen_alphanumeric(start="plan_desc_"),356 infra_map=mapping_data_vm_obj_mini.infra_mapping_data.get("name"),357 vm_list=mapping_data_vm_obj_mini.vm_list,358 target_provider=provider359 )360 assert migration_plan.wait_for_state("Started")361 assert migration_plan.wait_for_state("In_Progress")362 assert migration_plan.wait_for_state("Completed")363 assert migration_plan.wait_for_state("Successful")364 src_vm = mapping_data_vm_obj_mini.vm_list[0]365 migrated_vm = get_migrated_vm(src_vm, provider)366 @request.addfinalizer367 def _cleanup():368 cleanup_target(provider, migrated_vm)369 migration_plan.delete_completed_plan()370 # Check log files for any exposed password371 assert automation_log.validate()...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!