Best Python code snippet using Airtest
testpath_volume_recurring_snap.py
Source:testpath_volume_recurring_snap.py
...130 except Exception as e:131 raise Exception("Warning: Exception during cleanup : %s" % e)132 return133 @attr(tags=["advanced", "basic"],required_hardware="true")134 def test_01_volume_snapshot(self):135 """ Test Volume (root) Snapshot136 # 1. Create Hourly, Daily,Weekly recurring snapshot policy for ROOT disk and 137 Verify the presence of the corresponding snapshots on the Secondary Storage138 # 2. Delete the snapshot policy and verify the entry as Destroyed in snapshot_schedule139 # 3. Verify that maxsnaps should not consider manual snapshots for deletion140 # 4. Snapshot policy should reflect the correct timezone141 # 5. Verify that listSnapshotPolicies() should return all snapshot policies142 that belong to the account (both manual and recurring snapshots)143 # 6. Verify that listSnapshotPolicies() should not return snapshot 144 policies that have been deleted145 # 7. Verify that snapshot should not be created for VM in Destroyed state146 # 8. Verify that snapshot should get created after resuming the VM147 # 9. Verify that All the recurring policies associated with the VM should be 148 deleted after VM get destroyed.149 """150 # Step 1151 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'152 recurring_snapshot = SnapshotPolicy.create(153 self.apiclient,154 self.volume[0].id,155 self.testdata["recurring_snapshot"]156 )157 # ListSnapshotPolicy should return newly created policy158 list_snapshots_policy = list_snapshot_policy(159 self.apiclient,160 id=recurring_snapshot.id,161 volumeid=self.volume[0].id162 )163 list_validation = validateList(list_snapshots_policy)164 self.assertEqual(165 list_validation[0],166 PASS,167 "snapshot list validation failed due to %s" %168 list_validation[2])169 timeout = self.testdata["timeout"]170 while True:171 snapshots = list_snapshots(172 self.apiclient,173 volumeid=self.volume[0].id,174 intervaltype=self.testdata[175 "recurring_snapshot"]["intervaltype"],176 snapshottype='RECURRING',177 listall=True178 )179 if isinstance(snapshots, list):180 break181 elif timeout == 0:182 raise Exception("List snapshots API call failed.")183 for snapshot in snapshots:184 self.assertEqual(185 self.dbclient.execute(186 "select type_description from snapshots where name='%s'" %187 snapshot.name)[0][0],188 "HOURLY"189 )190 time.sleep(180)191 for snapshot in snapshots:192 self.assertTrue(193 is_snapshot_on_nfs(194 self.apiclient,195 self.dbclient,196 self.config,197 self.zone.id,198 snapshot.id))199 recurring_snapshot.delete(self.apiclient)200 self.assertEqual(201 self.dbclient.execute(202 "select * from snapshot_policy where uuid='%s'" %203 recurring_snapshot.id),204 []205 )206 self.testdata["recurring_snapshot"]["intervaltype"] = 'DAILY'207 self.testdata["recurring_snapshot"]["schedule"] = '00:00'208 recurring_snapshot_daily = SnapshotPolicy.create(209 self.apiclient,210 self.volume[0].id,211 self.testdata["recurring_snapshot"]212 )213 list_snapshots_policy_daily = list_snapshot_policy(214 self.apiclient,215 id=recurring_snapshot_daily.id,216 volumeid=self.volume[0].id217 )218 list_validation = validateList(list_snapshots_policy_daily)219 self.assertEqual(220 list_validation[0],221 PASS,222 "snapshot list validation failed due to %s" %223 list_validation[2])224 snap_db_daily = self.dbclient.execute(225 "select * from snapshot_policy where uuid='%s'" %226 recurring_snapshot_daily.id)227 validation_result_1 = validateList(snap_db_daily)228 self.assertEqual(229 validation_result_1[0],230 PASS,231 "snapshot_policy list validation failed due to %s" %232 validation_result_1[2])233 self.assertNotEqual(234 len(snap_db_daily),235 0,236 "Check DB Query result set"237 )238 recurring_snapshot_daily.delete(self.apiclient)239 self.assertEqual(240 self.dbclient.execute(241 "select * from snapshot_policy where uuid='%s'" %242 recurring_snapshot_daily.id),243 []244 )245 246 self.testdata["recurring_snapshot"]["intervaltype"] = 'WEEKLY'247 self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'248 recurring_snapshot_weekly = SnapshotPolicy.create(249 self.apiclient,250 self.volume[0].id,251 self.testdata["recurring_snapshot"]252 )253 list_snapshots_policy_weekly = list_snapshot_policy(254 self.apiclient,255 id=recurring_snapshot_weekly.id,256 volumeid=self.volume[0].id257 )258 list_validation = validateList(list_snapshots_policy_weekly)259 self.assertEqual(260 list_validation[0],261 PASS,262 "snapshot list validation failed due to %s" %263 list_validation[2])264 snap_sch_2 = self.dbclient.execute(265 "select * from snapshot_policy where uuid='%s'" %266 recurring_snapshot_weekly.id)267 validation_result_2 = validateList(snap_sch_2)268 self.assertEqual(269 validation_result_2[0],270 PASS,271 "snapshot_policy list validation failed due to %s" %272 validation_result_2[2])273 self.assertNotEqual(274 len(snap_sch_2),275 0,276 "Check DB Query result set"277 )278 recurring_snapshot_weekly.delete(self.apiclient)279 self.assertEqual(280 self.dbclient.execute(281 "select * from snapshot_policy where uuid='%s'" %282 recurring_snapshot_weekly.id),283 []284 )285 self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'286 self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'287 recurring_snapshot_monthly = SnapshotPolicy.create(288 self.apiclient,289 self.volume[0].id,290 self.testdata["recurring_snapshot"]291 )292 list_snapshots_policy_monthly = list_snapshot_policy(293 self.apiclient,294 id=recurring_snapshot_monthly.id,295 volumeid=self.volume[0].id296 )297 list_validation = validateList(list_snapshots_policy_monthly)298 self.assertEqual(299 list_validation[0],300 PASS,301 "snapshot list validation failed due to %s" %302 list_validation[2])303 snap_sch_3 = self.dbclient.execute(304 "select * from snapshot_policy where uuid='%s'" %305 recurring_snapshot_monthly.id)306 validation_result = validateList(snap_sch_3)307 self.assertEqual(308 validation_result[0],309 PASS,310 "snapshot_policy list validation failed due to %s" %311 validation_result[2])312 self.assertNotEqual(313 len(snap_sch_3),314 0,315 "Check DB Query result set"316 )317 recurring_snapshot_monthly.delete(self.apiclient)318 self.assertEqual(319 self.dbclient.execute(320 "select * from snapshot_policy where uuid='%s'" %321 recurring_snapshot_weekly.id),322 []323 )324 snapshots = list_snapshots(325 self.apiclient,326 volumeid=self.volume[0].id,327 intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],328 snapshottype='RECURRING',329 listall=True330 )331 # Step 3332 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'333 self.testdata["recurring_snapshot"]["schedule"] = 1334 recurring_snapshot_1 = SnapshotPolicy.create(335 self.apiclient,336 self.volume[0].id,337 self.testdata["recurring_snapshot"]338 )339 # ListSnapshotPolicy should return newly created policy340 list_snapshots_policy = list_snapshot_policy(341 self.apiclient,342 id=recurring_snapshot_1.id,343 volumeid=self.volume[0].id344 )345 list_validation = validateList(list_snapshots_policy)346 self.assertEqual(347 list_validation[0],348 PASS,349 "snapshot list validation failed due to %s" %350 list_validation[2])351 timeout = self.testdata["timeout"]352 while True:353 snapshots = list_snapshots(354 self.apiclient,355 volumeid=self.volume[0].id,356 intervaltype=self.testdata[357 "recurring_snapshot"]["intervaltype"],358 snapshottype='RECURRING',359 listall=True360 )361 if isinstance(snapshots, list):362 break363 elif timeout == 0:364 raise Exception("List snapshots API call failed.")365 snap_to_delete = snapshots[0]366 time.sleep(367 (self.testdata["recurring_snapshot"]["maxsnaps"]) * 3600368 )369 snapshots_1 = list_snapshots(370 self.apiclient,371 volumeid=self.volume[0].id,372 intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],373 snapshottype='RECURRING',374 listall=True375 )376 self.assertTrue(snap_to_delete not in snapshots_1)377 time.sleep(360)378 self.assertEqual(379 self.dbclient.execute(380 "select status from snapshots where uuid='%s'" %381 snap_to_delete.id)[0][0],382 "Destroyed"383 )384 self.assertFalse(385 is_snapshot_on_nfs(386 self.apiclient,387 self.dbclient,388 self.config,389 self.zone.id,390 snap_to_delete.id))391 # Step 4392 recurring_snapshot = SnapshotPolicy.create(393 self.apiclient,394 self.volume[0].id,395 self.testdata["recurring_snapshot"]396 )397 # ListSnapshotPolicy should return newly created policy398 list_snapshots_policy = list_snapshot_policy(399 self.apiclient,400 id=recurring_snapshot.id,401 volumeid=self.volume[0].id402 )403 list_validation = validateList(list_snapshots_policy)404 self.assertEqual(405 list_validation[0],406 PASS,407 "snapshot list validation failed due to %s" %408 list_validation[2])409 time.sleep(180)410 snap_time_hourly = self.dbclient.execute(411 "select scheduled_timestamp from \412 snapshot_schedule where uuid='%s'" %413 recurring_snapshot.id)414 self.debug("Timestamp for hourly snapshot %s" % snap_time_hourly)415 recurring_snapshot.delete(self.apiclient)416 self.testdata["recurring_snapshot"]["intervaltype"] = 'DAILY'417 self.testdata["recurring_snapshot"]["schedule"] = '00:00'418 recurring_snapshot_daily = SnapshotPolicy.create(419 self.apiclient,420 self.volume[0].id,421 self.testdata["recurring_snapshot"]422 )423 list_snapshots_policy_daily = list_snapshot_policy(424 self.apiclient,425 id=recurring_snapshot_daily.id,426 volumeid=self.volume[0].id427 )428 list_validation = validateList(list_snapshots_policy_daily)429 self.assertEqual(430 list_validation[0],431 PASS,432 "snapshot list validation failed due to %s" %433 list_validation[2])434 time.sleep(180)435 snap_time_daily = self.dbclient.execute(436 "select scheduled_timestamp from \437 snapshot_schedule where uuid='%s'" %438 recurring_snapshot_daily.id)439 self.debug("Timestamp for daily snapshot %s" % snap_time_daily)440 recurring_snapshot_daily.delete(self.apiclient)441 self.testdata["recurring_snapshot"]["intervaltype"] = 'WEEKLY'442 self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'443 recurring_snapshot_weekly = SnapshotPolicy.create(444 self.apiclient,445 self.volume[0].id,446 self.testdata["recurring_snapshot"]447 )448 list_snapshots_policy_weekly = list_snapshot_policy(449 self.apiclient,450 id=recurring_snapshot_weekly.id,451 volumeid=self.volume[0].id452 )453 list_validation = validateList(list_snapshots_policy_weekly)454 self.assertEqual(455 list_validation[0],456 PASS,457 "snapshot list validation failed due to %s" %458 list_validation[2])459 time.sleep(180)460 snap_time_weekly = self.dbclient.execute(461 "select scheduled_timestamp from \462 snapshot_schedule where uuid='%s'" %463 recurring_snapshot_weekly.id)464 self.debug("Timestamp for monthly snapshot %s" % snap_time_weekly)465 recurring_snapshot_weekly.delete(self.apiclient)466 self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'467 self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'468 recurring_snapshot_monthly = SnapshotPolicy.create(469 self.apiclient,470 self.volume[0].id,471 self.testdata["recurring_snapshot"]472 )473 list_snapshots_policy_monthly = list_snapshot_policy(474 self.apiclient,475 id=recurring_snapshot_monthly.id,476 volumeid=self.volume[0].id477 )478 list_validation = validateList(list_snapshots_policy_monthly)479 self.assertEqual(480 list_validation[0],481 PASS,482 "snapshot list validation failed due to %s" %483 list_validation[2])484 time.sleep(180)485 snap_time_monthly = self.dbclient.execute(486 "select scheduled_timestamp from \487 snapshot_schedule where uuid='%s'" %488 recurring_snapshot_monthly.id)489 self.debug("Timestamp for monthly snapshot %s" % snap_time_monthly)490 recurring_snapshot_monthly.delete(self.apiclient)491 # Step 5492 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'493 self.testdata["recurring_snapshot"]["schedule"] = 1494 recurring_snapshot_hourly = SnapshotPolicy.create(495 self.apiclient,496 self.volume[0].id,497 self.testdata["recurring_snapshot"]498 )499 self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'500 self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'501 recurring_snapshot_monthly = SnapshotPolicy.create(502 self.apiclient,503 self.volume[0].id,504 self.testdata["recurring_snapshot"]505 )506 list_snapshots_policy = list_snapshot_policy(507 self.apiclient,508 volumeid=self.volume[0].id509 )510 list_validation = validateList(list_snapshots_policy)511 self.assertEqual(512 list_validation[0],513 PASS,514 "snapshot list validation failed due to %s" %515 list_validation[2])516 for rec in [recurring_snapshot_hourly, recurring_snapshot_monthly]:517 self.assertTrue(518 rec.id in any(519 policy['id']) for policy in list_snapshots_policy)520 recurring_snapshot_hourly.delete(self.apiclient)521 recurring_snapshot_monthly.delete(self.apiclient)522 # Step 6523 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'524 self.testdata["recurring_snapshot"]["schedule"] = 1525 recurring_snapshot_hourly = SnapshotPolicy.create(526 self.apiclient,527 self.volume[0].id,528 self.testdata["recurring_snapshot"]529 )530 self.testdata["recurring_snapshot"]["intervaltype"] = 'MONTHLY'531 self.testdata["recurring_snapshot"]["schedule"] = '00:00:1'532 recurring_snapshot_monthly = SnapshotPolicy.create(533 self.apiclient,534 self.volume[0].id,535 self.testdata["recurring_snapshot"]536 )537 recurring_snapshot_monthly.delete(self.apiclient)538 list_snapshots_policy = list_snapshot_policy(539 self.apiclient,540 volumeid=self.volume[0].id541 )542 list_validation = validateList(list_snapshots_policy)543 self.assertEqual(544 list_validation[0],545 PASS,546 "snapshot list validation failed due to %s" %547 list_validation[2])548 self.assertTrue(549 recurring_snapshot_hourly.id in any(550 policy['id']) for policy in list_snapshots_policy)551 self.assertTrue(552 recurring_snapshot_monthly.id not in any(553 policy['id']) for policy in list_snapshots_policy)554 # Step 7555 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'556 self.testdata["recurring_snapshot"]["schedule"] = 1557 recurring_snapshot = SnapshotPolicy.create(558 self.apiclient,559 self.volume[0].id,560 self.testdata["recurring_snapshot"]561 )562 # ListSnapshotPolicy should return newly created policy563 list_snapshots_policy = list_snapshot_policy(564 self.apiclient,565 id=recurring_snapshot.id,566 volumeid=self.volume[0].id567 )568 list_validation = validateList(list_snapshots_policy)569 self.assertEqual(570 list_validation[0],571 PASS,572 "snapshot list validation failed due to %s" %573 list_validation[2])574 timeout = self.testdata["timeout"]575 while True:576 snapshots = list_snapshots(577 self.apiclient,578 volumeid=self.volume[0].id,579 intervaltype=self.testdata[580 "recurring_snapshot"]["intervaltype"],581 snapshottype='RECURRING',582 listall=True583 )584 if isinstance(snapshots, list):585 break586 elif timeout == 0:587 raise Exception("List snapshots API call failed.")588 self.vm_1.delete(self.apiclient, expunge=False)589 time.sleep(3600)590 snapshot_list = Snapshot.list(591 self.apiclient,592 volumeid=self.volume[0].id593 )594 list_validation = validateList(snapshot_list)595 self.assertEqual(596 list_validation[0],597 PASS,598 "snapshot list validation failed due to %s" %599 list_validation[2])600 self.assertEqual(len(snapshot_list),601 1,602 "Verify that snapsot is not created after VM deletion"603 )604 # Step 8605 self.vm_1.recover(self.apiclient)606 time.sleep(3600)607 snapshot_list = Snapshot.list(608 self.apiclient,609 volumeid=self.volume[0].id610 )611 self.assertEqual(len(snapshot_list),612 2,613 "Verify that snapsot is not created after VM deletion"614 )615 # Step 9616 self.vm_1.delete(self.apiclient)617 time.sleep(180)618 with self.assertRaises(Exception):619 list_snapshots_policy = list_snapshot_policy(620 self.apiclient,621 volumeid=self.volume[0].id622 )623 @attr(tags=["advanced", "basic"], required_hardware="true")624 def test_02_volume_max_snapshot(self):625 """ Test Volume Snapshot626 # 1. Create Hourly reccuring snapshot policy with maxsnaps=2 627 verify that when 3rd snapshot is taken first snapshot gets deleted628 """629 if self.hypervisor.lower() not in ["kvm", "vmware"]:630 self.skipTest("Skip test for hypervisor other than KVM and VMWare")631 # Step 1632 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'633 self.testdata["recurring_snapshot"]["schedule"] = 1634 recurring_snapshot_1 = SnapshotPolicy.create(635 self.apiclient,636 self.volume[0].id,637 self.testdata["recurring_snapshot"]638 )639 # ListSnapshotPolicy should return newly created policy640 list_snapshots_policy = list_snapshot_policy(641 self.apiclient,642 id=recurring_snapshot_1.id,643 volumeid=self.volume[0].id644 )645 list_validation = validateList(list_snapshots_policy)646 self.assertEqual(647 list_validation[0],648 PASS,649 "snapshot list validation failed due to %s" %650 list_validation[2])651 timeout = self.testdata["timeout"]652 while True:653 snapshots = list_snapshots(654 self.apiclient,655 volumeid=self.volume[0].id,656 intervaltype=self.testdata[657 "recurring_snapshot"]["intervaltype"],658 snapshottype='RECURRING',659 listall=True660 )661 if isinstance(snapshots, list):662 break663 elif timeout == 0:664 raise Exception("List snapshots API call failed.")665 snap_to_delete = snapshots[0]666 time.sleep(667 (self.testdata["recurring_snapshot"]["maxsnaps"]) * 3600668 )669 670 snapshots_1 = list_snapshots(671 self.apiclient,672 volumeid=self.volume[0].id,673 intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],674 snapshottype='RECURRING',675 listall=True676 )677 self.assertTrue(snap_to_delete not in snapshots_1)678 time.sleep(360)679 self.assertEqual(680 self.dbclient.execute(681 "select status from snapshots where uuid='%s'" %682 snap_to_delete.id)[0][0],683 "Destroyed"684 )685 self.assertFalse(686 is_snapshot_on_nfs(687 self.apiclient,688 self.dbclient,689 self.config,690 self.zone.id,691 snap_to_delete.id))692 # DATA DISK693 recurring_snapshot_data = SnapshotPolicy.create(694 self.apiclient,695 self.data_volume[0].id,696 self.testdata["recurring_snapshot"]697 )698 # ListSnapshotPolicy should return newly created policy699 list_snapshots_policy = list_snapshot_policy(700 self.apiclient,701 id=recurring_snapshot_data.id,702 volumeid=self.data_volume[0].id703 )704 list_validation = validateList(list_snapshots_policy)705 self.assertEqual(706 list_validation[0],707 PASS,708 "snapshot list validation failed due to %s" %709 list_validation[2])710 timeout = self.testdata["timeout"]711 while True:712 snapshots = list_snapshots(713 self.apiclient,714 volumeid=self.volume[0].id,715 intervaltype=self.testdata[716 "recurring_snapshot"]["intervaltype"],717 snapshottype='RECURRING',718 listall=True719 )720 if isinstance(snapshots, list):721 break722 elif timeout == 0:723 raise Exception("List snapshots API call failed.")724 data_snap_to_delete = snapshots[0]725 time.sleep(726 (self.testdata["recurring_snapshot"]["maxsnaps"]) * 3600727 )728 data_snapshots_1 = list_snapshots(729 self.apiclient,730 volumeid=self.volume[0].id,731 intervaltype=self.testdata["recurring_snapshot"]["intervaltype"],732 snapshottype='RECURRING',733 listall=True734 )735 self.assertTrue(data_snap_to_delete not in data_snapshots_1)736 time.sleep(360)737 self.assertEqual(738 self.dbclient.execute(739 "select status from snapshots where uuid='%s'" %740 snap_to_delete.id)[0][0],741 "Destroyed"742 )743 self.assertFalse(744 is_snapshot_on_nfs(745 self.apiclient,746 self.dbclient,747 self.config,748 self.zone.id,749 data_snap_to_delete.id))750 @attr(tags=["advanced", "basic"],required_hardware="true")751 def test_03_volume_rec_snapshot(self):752 """ Test Volume (root) Snapshot753 # 1. For snapshot.delta.max > maxsnaps verify that when number of snapshot exceeds 754 maxsnaps value previous snapshot should get deleted from database but remain 755 on secondary storage and when the value exceeds snapshot.delta.max the 756 snapshot should get deleted from secondary storage757 """758 if self.hypervisor.lower() != "xenserver":759 self.skipTest("Skip test for hypervisor other than Xenserver")760 # Step 1761 self.testdata["recurring_snapshot"]["intervaltype"] = 'HOURLY'762 self.testdata["recurring_snapshot"]["schedule"] = 1763 recurring_snapshot_root = SnapshotPolicy.create(764 self.apiclient,765 self.volume[0].id,...
test_snapshots.py
Source:test_snapshots.py
...191 self.assertEqual(len(resp_snapshots), 1)192 resp_snapshot = resp_snapshots.pop()193 self.assertEqual(resp_snapshot['id'], 123)194class SnapshotSerializerTest(test.TestCase):195 def _verify_snapshot(self, snap, tree):196 self.assertEqual(tree.tag, 'snapshot')197 for attr in ('id', 'status', 'size', 'createdAt',198 'displayName', 'displayDescription', 'volumeId'):199 self.assertEqual(str(snap[attr]), tree.get(attr))200 def test_snapshot_show_create_serializer(self):201 serializer = volumes.SnapshotTemplate()202 raw_snapshot = dict(203 id='snap_id',204 status='snap_status',205 size=1024,206 createdAt=datetime.datetime.now(),207 displayName='snap_name',208 displayDescription='snap_desc',209 volumeId='vol_id',210 )211 text = serializer.serialize(dict(snapshot=raw_snapshot))212 print text213 tree = etree.fromstring(text)214 self._verify_snapshot(raw_snapshot, tree)215 def test_snapshot_index_detail_serializer(self):216 serializer = volumes.SnapshotsTemplate()217 raw_snapshots = [dict(218 id='snap1_id',219 status='snap1_status',220 size=1024,221 createdAt=datetime.datetime.now(),222 displayName='snap1_name',223 displayDescription='snap1_desc',224 volumeId='vol1_id',225 ),226 dict(227 id='snap2_id',228 status='snap2_status',229 size=1024,230 createdAt=datetime.datetime.now(),231 displayName='snap2_name',232 displayDescription='snap2_desc',233 volumeId='vol2_id',234 )]235 text = serializer.serialize(dict(snapshots=raw_snapshots))236 print text237 tree = etree.fromstring(text)238 self.assertEqual('snapshots', tree.tag)239 self.assertEqual(len(raw_snapshots), len(tree))240 for idx, child in enumerate(tree):...
ec2_snapshot.py
Source:ec2_snapshot.py
...119 HAS_BOTO = False120# Find the most recent snapshot121def _get_snapshot_starttime(snap):122 return datetime.datetime.strptime(snap.start_time, '%Y-%m-%dT%H:%M:%S.000Z')123def _get_most_recent_snapshot(snapshots, max_snapshot_age_secs=None, now=None):124 """125 Gets the most recently created snapshot and optionally filters the result126 if the snapshot is too old127 :param snapshots: list of snapshots to search128 :param max_snapshot_age_secs: filter the result if its older than this129 :param now: simulate time -- used for unit testing130 :return:131 """132 if len(snapshots) == 0:133 return None134 if not now:135 now = datetime.datetime.utcnow()136 youngest_snapshot = max(snapshots, key=_get_snapshot_starttime)137 # See if the snapshot is younger that the given max age138 snapshot_start = datetime.datetime.strptime(youngest_snapshot.start_time, '%Y-%m-%dT%H:%M:%S.000Z')139 snapshot_age = now - snapshot_start140 if max_snapshot_age_secs is not None:141 if snapshot_age.total_seconds() > max_snapshot_age_secs:142 return None143 return youngest_snapshot144def _create_with_wait(snapshot, wait_timeout_secs, sleep_func=time.sleep):145 """146 Wait for the snapshot to be created147 :param snapshot:148 :param wait_timeout_secs: fail this step after this many seconds149 :param sleep_func:150 :return:151 """152 time_waited = 0153 snapshot.update()154 while snapshot.status != 'completed':155 sleep_func(3)156 snapshot.update()157 time_waited += 3158 if wait_timeout_secs and time_waited > wait_timeout_secs:159 return False160 return True161def create_snapshot(module, ec2, state=None, description=None, wait=None,162 wait_timeout=None, volume_id=None, instance_id=None,163 snapshot_id=None, device_name=None, snapshot_tags=None,164 last_snapshot_min_age=None):165 snapshot = None166 changed = False167 required = [volume_id, snapshot_id, instance_id]168 if required.count(None) != len(required) - 1: # only 1 must be set169 module.fail_json(msg='One and only one of volume_id or instance_id or snapshot_id must be specified')170 if instance_id and not device_name or device_name and not instance_id:171 module.fail_json(msg='Instance ID and device name must both be specified')172 if instance_id:173 try:174 volumes = ec2.get_all_volumes(filters={'attachment.instance-id': instance_id, 'attachment.device': device_name})175 except boto.exception.BotoServerError as e:176 module.fail_json(msg = "%s: %s" % (e.error_code, e.error_message))177 if not volumes:178 module.fail_json(msg="Could not find volume with name %s attached to instance %s" % (device_name, instance_id))179 volume_id = volumes[0].id180 if state == 'absent':181 if not snapshot_id:182 module.fail_json(msg = 'snapshot_id must be set when state is absent')183 try:184 ec2.delete_snapshot(snapshot_id)185 except boto.exception.BotoServerError as e:186 # exception is raised if snapshot does not exist187 if e.error_code == 'InvalidSnapshot.NotFound':188 module.exit_json(changed=False)189 else:190 module.fail_json(msg = "%s: %s" % (e.error_code, e.error_message))191 # successful delete192 module.exit_json(changed=True)193 if last_snapshot_min_age > 0:194 try:195 current_snapshots = ec2.get_all_snapshots(filters={'volume_id': volume_id})196 except boto.exception.BotoServerError as e:197 module.fail_json(msg="%s: %s" % (e.error_code, e.error_message))198 last_snapshot_min_age = last_snapshot_min_age * 60 # Convert to seconds199 snapshot = _get_most_recent_snapshot(current_snapshots,200 max_snapshot_age_secs=last_snapshot_min_age)201 try:202 # Create a new snapshot if we didn't find an existing one to use203 if snapshot is None:204 snapshot = ec2.create_snapshot(volume_id, description=description)205 changed = True206 if wait:207 if not _create_with_wait(snapshot, wait_timeout):208 module.fail_json(msg='Timed out while creating snapshot.')209 if snapshot_tags:210 for k, v in snapshot_tags.items():211 snapshot.add_tag(k, v)212 except boto.exception.BotoServerError as e:213 module.fail_json(msg="%s: %s" % (e.error_code, e.error_message))214 module.exit_json(changed=changed,215 snapshot_id=snapshot.id,216 volume_id=snapshot.volume_id,217 volume_size=snapshot.volume_size,218 tags=snapshot.tags.copy())219def create_snapshot_ansible_module():220 argument_spec = ec2_argument_spec()221 argument_spec.update(222 dict(223 volume_id = dict(),224 description = dict(),225 instance_id = dict(),226 snapshot_id = dict(),227 device_name = dict(),228 wait = dict(type='bool', default=True),229 wait_timeout = dict(type='int', default=0),230 last_snapshot_min_age = dict(type='int', default=0),231 snapshot_tags = dict(type='dict', default=dict()),232 state = dict(choices=['absent','present'], default='present'),233 )234 )235 module = AnsibleModule(argument_spec=argument_spec)236 return module237def main():238 module = create_snapshot_ansible_module()239 if not HAS_BOTO:240 module.fail_json(msg='boto required for this module')241 volume_id = module.params.get('volume_id')242 snapshot_id = module.params.get('snapshot_id')243 description = module.params.get('description')244 instance_id = module.params.get('instance_id')245 device_name = module.params.get('device_name')246 wait = module.params.get('wait')247 wait_timeout = module.params.get('wait_timeout')248 last_snapshot_min_age = module.params.get('last_snapshot_min_age')249 snapshot_tags = module.params.get('snapshot_tags')250 state = module.params.get('state')251 ec2 = ec2_connect(module)252 create_snapshot(253 module=module,254 state=state,255 description=description,256 wait=wait,257 wait_timeout=wait_timeout,258 ec2=ec2,259 volume_id=volume_id,260 instance_id=instance_id,261 snapshot_id=snapshot_id,262 device_name=device_name,263 snapshot_tags=snapshot_tags,264 last_snapshot_min_age=last_snapshot_min_age265 )266# import module snippets...
test_snapshot.py
Source:test_snapshot.py
1#2# Licensed under the Apache License, Version 2.0 (the "License"); you may3# not use this file except in compliance with the License. You may obtain4# a copy of the License at5#6# http://www.apache.org/licenses/LICENSE-2.07#8# Unless required by applicable law or agreed to in writing, software9# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT10# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the11# License for the specific language governing permissions and limitations12# under the License.13#14import copy15from openstackclient.tests import fakes16from openstackclient.tests.volume.v2 import fakes as volume_fakes17from openstackclient.volume.v2 import snapshot18class TestSnapshot(volume_fakes.TestVolume):19 def setUp(self):20 super(TestSnapshot, self).setUp()21 self.snapshots_mock = self.app.client_manager.volume.volume_snapshots22 self.snapshots_mock.reset_mock()23 self.volumes_mock = self.app.client_manager.volume.volumes24 self.volumes_mock.reset_mock()25class TestSnapshotCreate(TestSnapshot):26 def setUp(self):27 super(TestSnapshotCreate, self).setUp()28 self.volumes_mock.get.return_value = fakes.FakeResource(29 None,30 copy.deepcopy(volume_fakes.VOLUME),31 loaded=True32 )33 self.snapshots_mock.create.return_value = fakes.FakeResource(34 None,35 copy.deepcopy(volume_fakes.SNAPSHOT),36 loaded=True37 )38 # Get the command object to test39 self.cmd = snapshot.CreateSnapshot(self.app, None)40 def test_snapshot_create(self):41 arglist = [42 volume_fakes.volume_id,43 "--name", volume_fakes.snapshot_name,44 "--description", volume_fakes.snapshot_description,45 "--force"46 ]47 verifylist = [48 ("volume", volume_fakes.volume_id),49 ("name", volume_fakes.snapshot_name),50 ("description", volume_fakes.snapshot_description),51 ("force", True)52 ]53 parsed_args = self.check_parser(self.cmd, arglist, verifylist)54 columns, data = self.cmd.take_action(parsed_args)55 self.snapshots_mock.create.assert_called_with(56 volume_fakes.volume_id,57 force=True,58 name=volume_fakes.snapshot_name,59 description=volume_fakes.snapshot_description60 )61 self.assertEqual(columns, volume_fakes.SNAPSHOT_columns)62 self.assertEqual(data, volume_fakes.SNAPSHOT_data)63class TestSnapshotDelete(TestSnapshot):64 def setUp(self):65 super(TestSnapshotDelete, self).setUp()66 self.snapshots_mock.get.return_value = fakes.FakeResource(67 None,68 copy.deepcopy(volume_fakes.SNAPSHOT),69 loaded=True)70 self.snapshots_mock.delete.return_value = None71 # Get the command object to mock72 self.cmd = snapshot.DeleteSnapshot(self.app, None)73 def test_snapshot_delete(self):74 arglist = [75 volume_fakes.snapshot_id76 ]77 verifylist = [78 ("snapshots", [volume_fakes.snapshot_id])79 ]80 parsed_args = self.check_parser(self.cmd, arglist, verifylist)81 result = self.cmd.take_action(parsed_args)82 self.snapshots_mock.delete.assert_called_with(volume_fakes.snapshot_id)83 self.assertIsNone(result)84class TestSnapshotList(TestSnapshot):85 columns = [86 "ID",87 "Name",88 "Description",89 "Status",90 "Size"91 ]92 def setUp(self):93 super(TestSnapshotList, self).setUp()94 self.volumes_mock.list.return_value = [95 fakes.FakeResource(96 None,97 copy.deepcopy(volume_fakes.VOLUME),98 loaded=True99 )100 ]101 self.snapshots_mock.list.return_value = [102 fakes.FakeResource(103 None,104 copy.deepcopy(volume_fakes.SNAPSHOT),105 loaded=True106 )107 ]108 # Get the command to test109 self.cmd = snapshot.ListSnapshot(self.app, None)110 def test_snapshot_list_without_options(self):111 arglist = []112 verifylist = [113 ('all_projects', False),114 ("long", False)115 ]116 parsed_args = self.check_parser(self.cmd, arglist, verifylist)117 columns, data = self.cmd.take_action(parsed_args)118 self.assertEqual(self.columns, columns)119 datalist = ((120 volume_fakes.snapshot_id,121 volume_fakes.snapshot_name,122 volume_fakes.snapshot_description,123 "available",124 volume_fakes.snapshot_size125 ),)126 self.assertEqual(datalist, tuple(data))127 def test_snapshot_list_with_options(self):128 arglist = ["--long"]129 verifylist = [("long", True), ('all_projects', False)]130 parsed_args = self.check_parser(self.cmd, arglist, verifylist)131 columns, data = self.cmd.take_action(parsed_args)132 columns = self.columns + [133 "Created At",134 "Volume",135 "Properties"136 ]137 self.assertEqual(columns, columns)138 datalist = ((139 volume_fakes.snapshot_id,140 volume_fakes.snapshot_name,141 volume_fakes.snapshot_description,142 "available",143 volume_fakes.snapshot_size,144 "2015-06-03T18:49:19.000000",145 volume_fakes.volume_name,146 volume_fakes.EXPECTED_SNAPSHOT.get("properties")147 ),)148 self.assertEqual(datalist, tuple(data))149 def test_snapshot_list_all_projects(self):150 arglist = [151 '--all-projects',152 ]153 verifylist = [154 ('long', False),155 ('all_projects', True)156 ]157 parsed_args = self.check_parser(self.cmd, arglist, verifylist)158 columns, data = self.cmd.take_action(parsed_args)159 self.assertEqual(self.columns, columns)160 datalist = ((161 volume_fakes.snapshot_id,162 volume_fakes.snapshot_name,163 volume_fakes.snapshot_description,164 "available",165 volume_fakes.snapshot_size166 ), )167 self.assertEqual(datalist, tuple(data))168class TestSnapshotSet(TestSnapshot):169 def setUp(self):170 super(TestSnapshotSet, self).setUp()171 self.snapshots_mock.get.return_value = fakes.FakeResource(172 None,173 copy.deepcopy(volume_fakes.SNAPSHOT),174 loaded=True175 )176 self.snapshots_mock.set_metadata.return_value = None177 self.snapshots_mock.update.return_value = None178 # Get the command object to mock179 self.cmd = snapshot.SetSnapshot(self.app, None)180 def test_snapshot_set(self):181 arglist = [182 volume_fakes.snapshot_id,183 "--name", "new_snapshot",184 "--property", "x=y",185 "--property", "foo=foo"186 ]187 new_property = {"x": "y", "foo": "foo"}188 verifylist = [189 ("snapshot", volume_fakes.snapshot_id),190 ("name", "new_snapshot"),191 ("property", new_property)192 ]193 parsed_args = self.check_parser(self.cmd, arglist, verifylist)194 result = self.cmd.take_action(parsed_args)195 kwargs = {196 "name": "new_snapshot",197 }198 self.snapshots_mock.update.assert_called_with(199 volume_fakes.snapshot_id, **kwargs)200 self.snapshots_mock.set_metadata.assert_called_with(201 volume_fakes.snapshot_id, new_property202 )203 self.assertIsNone(result)204class TestSnapshotShow(TestSnapshot):205 def setUp(self):206 super(TestSnapshotShow, self).setUp()207 self.snapshots_mock.get.return_value = fakes.FakeResource(208 None,209 copy.deepcopy(volume_fakes.SNAPSHOT),210 loaded=True)211 # Get the command object to test212 self.cmd = snapshot.ShowSnapshot(self.app, None)213 def test_snapshot_show(self):214 arglist = [215 volume_fakes.snapshot_id216 ]217 verifylist = [218 ("snapshot", volume_fakes.snapshot_id)219 ]220 parsed_args = self.check_parser(self.cmd, arglist, verifylist)221 columns, data = self.cmd.take_action(parsed_args)222 self.snapshots_mock.get.assert_called_with(volume_fakes.snapshot_id)223 self.assertEqual(volume_fakes.SNAPSHOT_columns, columns)224 self.assertEqual(volume_fakes.SNAPSHOT_data, data)225class TestSnapshotUnset(TestSnapshot):226 def setUp(self):227 super(TestSnapshotUnset, self).setUp()228 self.snapshots_mock.get.return_value = fakes.FakeResource(229 None,230 copy.deepcopy(volume_fakes.SNAPSHOT),231 loaded=True232 )233 self.snapshots_mock.delete_metadata.return_value = None234 # Get the command object to mock235 self.cmd = snapshot.UnsetSnapshot(self.app, None)236 def test_snapshot_unset(self):237 arglist = [238 volume_fakes.snapshot_id,239 "--property", "foo"240 ]241 verifylist = [242 ("snapshot", volume_fakes.snapshot_id),243 ("property", ["foo"])244 ]245 parsed_args = self.check_parser(self.cmd, arglist, verifylist)246 result = self.cmd.take_action(parsed_args)247 self.snapshots_mock.delete_metadata.assert_called_with(248 volume_fakes.snapshot_id, ["foo"]249 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!