How to use delete_volume method in localstack

Best Python code snippet using localstack_python

test_main.py

Source:test_main.py Github

copy

Full Screen

...226 leftover_volumes.append(filename)227 while volume_cleanup_list:228 v = volume_cleanup_list.pop()229 try:230 delete_volume(v)231 except:232 print "Failed to delete volume ", v233 if len(leftover_volumes) != 0:234 print leftover_volumes235 assert False236def create_volume(size = "", name = "", backup = "", endpoint = "", driver = "",237 volume_id = "", volume_type = "", iops = "", forvm = False):238 name = v.create_volume(size, name, backup, endpoint, driver,239 volume_id, volume_type, iops, forvm)240 if driver == "" or driver == DM:241 dm_cleanup_list.append(name)242 volume_cleanup_list.append(name)243 return name244def delete_volume(name, ref_only = False):245 v.delete_volume(name, ref_only)246 try:247 dm_cleanup_list.remove(name)248 except ValueError:249 pass250 volume_cleanup_list.remove(name)251def mount_volume_with_path(name):252 mount_dir = v.mount_volume_with_path(name)253 mount_cleanup_list.append(mount_dir)254 return mount_dir255def mount_volume(name):256 mount_dir = v.mount_volume(name)257 mount_cleanup_list.append(mount_dir)258 return mount_dir259def umount_volume(name, mount_dir):260 v.umount_volume(name)261 mount_cleanup_list.remove(mount_dir)262def test_volume_crud():263 volume_crud_test(DM, vmTest = False)264 volume_crud_test(VFS, sizeTest = False)265def volume_crud_test(drv, sizeTest = True, vmTest = True):266 name1 = create_volume(driver=drv)267 name2 = create_volume(driver=drv)268 if sizeTest:269 name3 = create_volume(VOLUME_SIZE_BIG, driver = drv)270 name4 = create_volume(VOLUME_SIZE_SMALL, driver = drv)271 delete_volume(name4)272 delete_volume(name3)273 if vmTest:274 name3 = create_volume(driver = drv, forvm = True)275 name4 = create_volume(driver = drv, forvm = True)276 delete_volume(name4)277 delete_volume(name3)278 delete_volume(name2)279 delete_volume(name1)280@pytest.mark.skipif(not pytest.config.getoption("ebs"),281 reason="--ebs was not specified")282def test_ebs_volume_crud():283 # need to test volume type and create volume from existed EBS volume feature284 name1 = create_volume(driver=EBS)285 name2 = create_volume(size=VOLUME_SIZE_SMALL, driver=EBS, volume_type="gp2")286 name3 = create_volume(size=VOLUME_SIZE_IOPS, driver=EBS, volume_type="io1",287 iops="100")288 volume3 = v.inspect_volume(name3)289 ebs_volume_id3 = volume3["DriverInfo"]["EBSVolumeID"]290 delete_volume(name3, ref_only = True)291 name3 = create_volume(driver=EBS, volume_id=ebs_volume_id3)292 delete_volume(name3)293 delete_volume(name2)294 delete_volume(name1)295def test_vfs_delete_volume_ref_only():296 name = create_volume(driver=VFS)297 insp = v.inspect_volume(name)298 path = insp["DriverInfo"]["Path"]299 assert os.path.exists(path)300 filename = "testfile"301 test_file = os.path.join(path,filename)302 with open(test_file, "w") as f:303 subprocess.check_call(["echo", "This is volume test file"], stdout=f)304 assert os.path.exists(test_file)305 delete_volume(name, ref_only = True)306 assert os.path.exists(test_file)307 os.remove(test_file)308def test_volume_name():309 volume_name_test(DM)310 volume_name_test(VFS)311def volume_name_test(drv):312 vol_name1 = "vol.1_1-0"313 vol_name2 = "vol.2_2-0"314 vol = create_volume(name=vol_name1, driver=drv)315 vols = v.list_volumes()316 assert vols[vol]["Name"] == vol_name1317 assert vols[vol]["Driver"] == drv318 assert vols[vol]["CreatedTime"] != ""319 with pytest.raises(subprocess.CalledProcessError):320 new_name = create_volume(name=vol_name1, driver=drv)321 with pytest.raises(subprocess.CalledProcessError):322 new_name = create_volume(driver="randomdriver")323 delete_volume(vol_name1)324 vols = v.list_volumes()325 assert vol not in vols326 vol1 = create_volume(name=vol_name1, driver=drv)327 vol2 = create_volume(name=vol_name2, driver=drv)328 vols = v.list_volumes()329 assert vols[vol1]["Name"] == vol_name1330 assert vols[vol2]["Name"] == vol_name2331 assert vols[vol1]["CreatedTime"] != ""332 assert vols[vol2]["CreatedTime"] != ""333 delete_volume(vol1)334 delete_volume(vol_name2)335def mount_volume_and_create_file(name, filename):336 # with format337 volume_mount_dir = mount_volume(name)338 test_file = os.path.join(volume_mount_dir,filename)339 with open(test_file, "w") as f:340 subprocess.check_call(["echo", "This is volume test file"], stdout=f)341 assert os.path.exists(test_file)342 umount_volume(name, volume_mount_dir)343 # Doesn't work with current VFS implmentation, since it won't really mount344 #assert not os.path.exists(test_file)345def test_volume_mount():346 volume_mount_test(DM)347 if test_ebs:348 volume_mount_test(EBS)349 # skip the vfs mount test because we only pass the original volume path as350 # mount path, not really done any mount work now351def volume_mount_test(drv):352 vol = create_volume(driver=drv)353 # with format354 filename = "test"355 mount_volume_and_create_file(vol, filename)356 # without format357 volume_mount_dir = mount_volume_with_path(vol)358 test_file = os.path.join(volume_mount_dir, filename)359 assert os.path.exists(test_file)360 umount_volume(vol, volume_mount_dir)361 assert not os.path.exists(test_file)362 # auto mount363 volume_mount_dir = mount_volume(vol)364 test_file = os.path.join(volume_mount_dir, filename)365 assert os.path.exists(test_file)366 umount_volume(vol, volume_mount_dir)367 assert not os.path.exists(test_file)368 delete_volume(vol)369def test_volume_vm_mount():370 volume_vm_test(VFS)371def volume_vm_test(drv):372 vol = create_volume(driver = drv, size = VOLUME_SIZE_SMALL, forvm = True)373 mount_dir = mount_volume(vol)374 image_filepath = os.path.join(mount_dir, VM_IMAGE_FILE)375 assert os.path.exists(image_filepath)376 size = os.stat(image_filepath).st_size377 assert str(size) == VOLUME_SIZE_SMALL378 umount_volume(vol, mount_dir)379 delete_volume(vol)380def test_volume_list():381 volume_list_driver_test(DM)382 volume_list_driver_test(VFS, False)383 if test_ebs:384 volume_list_driver_test(EBS)385def volume_list_driver_test(drv, check_size = True):386 volumes = v.list_volumes()387 assert len(volumes) == 0388 vol1 = create_volume(driver=drv)389 vol2 = create_volume(driver=drv)390 if check_size:391 vol3 = create_volume(VOLUME_SIZE_BIG, driver=drv)392 vol4 = create_volume(VOLUME_SIZE_SMALL, driver=drv)393 volume = v.inspect_volume(vol1)394 assert volume["Name"] == vol1395 if check_size:396 assert volume["DriverInfo"]["Size"] == DEFAULT_VOLUME_SIZE397 volume = v.inspect_volume(vol2)398 assert volume["Name"] == vol2399 if check_size:400 assert volume["DriverInfo"]["Size"] == DEFAULT_VOLUME_SIZE401 if check_size:402 volumes = v.list_volumes()403 assert volumes[vol1]["DriverInfo"]["Size"] == DEFAULT_VOLUME_SIZE404 assert volumes[vol2]["DriverInfo"]["Size"] == DEFAULT_VOLUME_SIZE405 assert volumes[vol3]["DriverInfo"]["Size"] == VOLUME_SIZE_BIG_Bytes406 assert volumes[vol4]["DriverInfo"]["Size"] == VOLUME_SIZE_SMALL407 delete_volume(vol4)408 delete_volume(vol3)409 delete_volume(vol2)410 delete_volume(vol1)411def test_snapshot_crud():412 snapshot_crud_test(DM)413 snapshot_crud_test(VFS)414def snapshot_crud_test(driver):415 volume_name = create_volume(VOLUME_SIZE_SMALL, name="vol1", driver=driver)416 snapshot = v.create_snapshot(volume_name)417 v.delete_snapshot(snapshot)418 delete_volume(volume_name)419 # delete snapshot automatically with volume420 volume_name = create_volume(VOLUME_SIZE_SMALL, name="vol1", driver=driver)421 snap1 = v.create_snapshot(volume_name)422 snap2 = v.create_snapshot(volume_name)423 snap3 = v.create_snapshot(volume_name)424 v.delete_snapshot(snap1)425 v.delete_snapshot(snap2)426 delete_volume(volume_name)427def test_snapshot_name():428 snapshot_name_test(DM)429 snapshot_name_test(VFS)430def snapshot_name_test(driver):431 volume_name = create_volume(VOLUME_SIZE_SMALL, driver=driver)432 snap1_name = "snap1"433 snap1 = v.create_snapshot(volume_name, name=snap1_name)434 assert snap1_name == snap1435 vols = v.list_volumes()436 s = vols[volume_name]["Snapshots"][snap1]437 assert s["Name"] == snap1_name438 assert s["DriverInfo"]["Driver"] == driver439 assert s["CreatedTime"] != ""440 with pytest.raises(subprocess.CalledProcessError):441 new_name = v.create_snapshot(volume_name, name=snap1_name)442 v.delete_snapshot(snap1)443 delete_volume(volume_name)444def test_snapshot_list():445 snapshot_list_test(DM)446 snapshot_list_test(VFS, False)447def snapshot_list_test(driver, check_size = True):448 volume1_name = create_volume(VOLUME_SIZE_SMALL, name = "volume1", driver=driver)449 volume2_name = create_volume(VOLUME_SIZE_BIG, driver=driver)450 with pytest.raises(subprocess.CalledProcessError):451 snapshot = v.inspect_snapshot(str(uuid.uuid1()))452 with pytest.raises(subprocess.CalledProcessError):453 volume = v.inspect_snapshot(str(uuid.uuid1()))454 snap0_vol1 = v.create_snapshot(volume1_name, "snap0_vol1")455 assert snap0_vol1 == "snap0_vol1"456 snapshot = v.inspect_snapshot("snap0_vol1")457 assert snapshot["VolumeName"] == volume1_name458 if check_size:459 assert str(snapshot["DriverInfo"]["Size"]) == VOLUME_SIZE_SMALL460 assert snapshot["Name"] == "snap0_vol1"461 snap1_vol1 = v.create_snapshot(volume1_name)462 snap2_vol1 = v.create_snapshot(volume1_name)463 snap1_vol2 = v.create_snapshot(volume2_name, "snap1_vol2")464 assert snap1_vol2 == "snap1_vol2"465 snap2_vol2 = v.create_snapshot(volume2_name, "snap2_vol2")466 assert snap2_vol2 == "snap2_vol2"467 snap3_vol2 = v.create_snapshot(volume2_name, "snap3_vol2")468 assert snap3_vol2 == "snap3_vol2"469 volume = v.inspect_volume(volume2_name)470 assert snap1_vol2 in volume["Snapshots"]471 assert volume["Snapshots"][snap1_vol2]["Name"] == "snap1_vol2"472 assert volume["Snapshots"][snap1_vol2]["CreatedTime"] != ""473 assert snap2_vol2 in volume["Snapshots"]474 assert volume["Snapshots"][snap2_vol2]["Name"] == "snap2_vol2"475 assert volume["Snapshots"][snap2_vol2]["CreatedTime"] != ""476 assert snap3_vol2 in volume["Snapshots"]477 assert volume["Snapshots"][snap3_vol2]["Name"] == "snap3_vol2"478 assert volume["Snapshots"][snap3_vol2]["CreatedTime"] != ""479 volumes = v.list_volumes()480 assert snap0_vol1 in volumes[volume1_name]["Snapshots"]481 assert snap1_vol1 in volumes[volume1_name]["Snapshots"]482 assert snap2_vol1 in volumes[volume1_name]["Snapshots"]483 assert snap1_vol2 in volumes[volume2_name]["Snapshots"]484 assert snap2_vol2 in volumes[volume2_name]["Snapshots"]485 assert snap3_vol2 in volumes[volume2_name]["Snapshots"]486 v.delete_snapshot(snap0_vol1)487 with pytest.raises(subprocess.CalledProcessError):488 snapshot = v.inspect_snapshot(snap0_vol1)489 v.delete_snapshot(snap1_vol1)490 v.delete_snapshot(snap2_vol1)491 v.delete_snapshot(snap1_vol2)492 v.delete_snapshot(snap2_vol2)493 v.delete_snapshot(snap3_vol2)494 delete_volume(volume2_name)495 delete_volume(volume1_name)496@pytest.mark.skipif(not pytest.config.getoption("ebs"),497 reason="--ebs was not specified")498def test_ebs_snapshot_backup():499 volume_name = create_volume(size = VOLUME_SIZE_SMALL, name = "ebs_volume", driver=EBS)500 assert volume_name == "ebs_volume"501 mount_volume_and_create_file(volume_name, "test-vol1-v1")502 snap1_name = v.create_snapshot("ebs_volume", "snap1")503 assert snap1_name == "snap1"504 volume = v.inspect_volume("ebs_volume")505 snap1 = v.inspect_snapshot("snap1")506 assert snap1["VolumeName"] == volume_name507 assert snap1["Name"] == "snap1"508 assert str(snap1["DriverInfo"]["Size"]) == VOLUME_SIZE_SMALL509 assert snap1["DriverInfo"]["EBSVolumeID"] == volume["DriverInfo"]["EBSVolumeID"]510 assert snap1["DriverInfo"]["Size"] == volume["DriverInfo"]["Size"]511 global S3_ENDPOINT512 endpoint = S3_ENDPOINT or ""513 backup_url = v.create_backup(snap1_name, endpoint = endpoint)514 backup = v.inspect_backup(backup_url, endpoint = endpoint)515 assert backup["EBSVolumeID"] == volume["DriverInfo"]["EBSVolumeID"]516 assert backup["EBSSnapshotID"] == snap1["DriverInfo"]["EBSSnapshotID"]517 assert backup["Size"] == snap1["DriverInfo"]["Size"]518 v.delete_backup(backup_url, endpoint = endpoint)519 v.delete_snapshot("snap1")520 delete_volume(volume_name)521def create_delete_volume():522 vol = v.create_volume(size = VOLUME_SIZE_6M)523 snap = v.create_snapshot(vol)524 v.delete_snapshot(snap)525 v.delete_volume(vol)526# uses default driver which is device mapper527def test_create_volume_in_parallel():528 threads = []529 for i in range(TEST_THREAD_COUNT):530 threads.append(threading.Thread(target = create_delete_volume))531 threads[i].start()532 for i in range(TEST_THREAD_COUNT):533 threads[i].join()534def test_create_volume_in_sequence():535 for i in range(TEST_LOOP_COUNT):536 create_delete_volume()537def compress_volume(volume_name):538 mountpoint = mount_volume(volume_name)539 zipfile = os.path.join(TEST_ROOT, volume_name)540 shutil.make_archive(zipfile, "zip", mountpoint)541 umount_volume(volume_name, mountpoint)542 return zipfile + ".zip"543def get_volume_checksum(volume_name, driver):544 f = ""545 if driver == VFS:546 f = compress_volume(volume_name)547 else: # DM/EBS548 f = v.inspect_volume(volume_name)["DriverInfo"]["Device"]549 output = subprocess.check_output(["sha512sum", f]).decode()550 if driver == "VFS" and f != "":551 os.remove(f)552 return output.split(" ")[0]553def check_restore(origin_vol, restored_vol, driver):554 volume_checksum = get_volume_checksum(origin_vol, driver)555 restore_checksum = get_volume_checksum(restored_vol, driver)556 assert volume_checksum == restore_checksum557def test_backup_create_restore_only():558 process_restore_with_original_removed(VFS, VFS_DEST)559 process_restore_with_original_removed(DM, VFS_DEST)560 if test_ebs:561 process_restore_with_original_removed(EBS)562def process_restore_with_original_removed(driver, dest = ""):563 volume1_name = create_volume(size = VOLUME_SIZE_BIG, driver = driver)564 mount_volume_and_create_file(volume1_name, "test-vol1-v1")565 snap1_vol1_name = v.create_snapshot(volume1_name)566 bak = v.create_backup(snap1_vol1_name, dest)567 volume1_checksum = get_volume_checksum(volume1_name, driver)568 delete_volume(volume1_name)569 if driver == DM:570 #cannot specify different size with backup571 with pytest.raises(subprocess.CalledProcessError):572 res_volume1_name = create_volume(VOLUME_SIZE_SMALL, "res-vol1", bak,573 driver = driver)574 res_volume1_name = create_volume(name = "res-vol1", backup = bak, driver =575 driver)576 res_volume1_checksum = get_volume_checksum(res_volume1_name, driver)577 assert res_volume1_checksum == volume1_checksum578 delete_volume(res_volume1_name)579 v.delete_backup(bak)580def test_duplicate_backup():581 process_duplicate_backup_test(VFS_DEST, VFS)582 process_duplicate_backup_test(VFS_DEST, DM)583def process_duplicate_backup_test(dest, driver):584 volume_name = create_volume(size = VOLUME_SIZE_BIG, driver = driver)585 mount_volume_and_create_file(volume_name, "volume_snap_test")586 snap_name = v.create_snapshot(volume_name)587 volume_checksum = get_volume_checksum(volume_name, driver)588 bak1 = v.create_backup(snap_name, dest)589 bak2 = v.create_backup(snap_name, dest)590 res2 = create_volume(backup = bak2, driver = driver)591 res2_checksum = get_volume_checksum(res2, driver = driver)592 assert res2_checksum == volume_checksum593 v.delete_backup(bak2)594 res1 = create_volume(backup = bak1, driver = driver)595 res1_checksum = get_volume_checksum(res1, driver = driver)596 assert res1_checksum == volume_checksum597 v.delete_backup(bak1)598 delete_volume(res2)599 delete_volume(res1)600 delete_volume(volume_name)601def test_vfs_objectstore():602 vfs_objectstore_test(VFS)603 vfs_objectstore_test(DM)604def vfs_objectstore_test(driver):605 process_objectstore_test(VFS_DEST, "", driver)606def test_s3_objectstore():607 global S3_ENDPOINT608 endpoint = S3_ENDPOINT or ""609 s3_objectstore_test(endpoint, VFS)610 s3_objectstore_test(endpoint, DM)611def s3_objectstore_test(endpoint, driver):612 process_objectstore_test(get_s3_dest(), endpoint, driver)613 process_objectstore_test(get_s3_dest(S3_PATH), endpoint, driver)614def get_s3_dest(path = ""):615 global S3_REGION, S3_BUCKET616 return "s3://" + S3_BUCKET + "@" + S3_REGION + "/" + path617def unescape_url(url):618 return url.replace("\\u0026", "&").replace("u0026","&")619def process_objectstore_test(dest, endpoint, driver):620 #make sure objectstore is empty621 backups = v.list_backup(dest, endpoint = endpoint)622 assert len(backups) == 0623 #add volume to objectstore624 name1 = "volume1_" + str(uuid.uuid4())[:8]625 name2 = str(uuid.uuid4())[:2]626 volume1_name = create_volume(VOLUME_SIZE_BIG, name1, driver=driver)627 volume1 = v.inspect_volume(name1)628 volume2_name = create_volume(VOLUME_SIZE_SMALL, name2, driver=driver)629 with pytest.raises(subprocess.CalledProcessError):630 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume1_name)631 #first snapshots632 snap1_vol1_name = v.create_snapshot(volume1_name, "snap1_vol1")633 snap1_vol1 = v.inspect_snapshot("snap1_vol1")634 snap1_vol1_bak = v.create_backup("snap1_vol1", dest, endpoint = endpoint)635 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume1_name)636 assert len(backups) == 1637 backup = backups[unescape_url(snap1_vol1_bak)]638 assert backup["DriverName"] == driver639 assert backup["VolumeName"] == volume1["Name"]640 if "Size" in volume1["DriverInfo"]:641 assert backup["VolumeSize"] == volume1["DriverInfo"]["Size"]642 assert backup["VolumeCreatedAt"] == volume1["CreatedTime"]643 assert backup["SnapshotName"] == snap1_vol1["Name"]644 assert backup["SnapshotCreatedAt"] == snap1_vol1["CreatedTime"]645 assert backup["CreatedTime"] != ""646 backup = v.inspect_backup(snap1_vol1_bak, endpoint = endpoint)647 assert backup["DriverName"] == driver648 assert backup["VolumeName"] == volume1["Name"]649 if "Size" in volume1["DriverInfo"]:650 assert backup["VolumeSize"] == volume1["DriverInfo"]["Size"]651 assert backup["VolumeCreatedAt"] == volume1["CreatedTime"]652 assert backup["SnapshotName"] == snap1_vol1["Name"]653 assert backup["SnapshotCreatedAt"] == snap1_vol1["CreatedTime"]654 assert backup["CreatedTime"] != ""655 snap1_vol2_name = v.create_snapshot(volume2_name, "snap1_vol2")656 snap1_vol2_bak = v.create_backup("snap1_vol2", dest, endpoint = endpoint)657 #list snapshots658 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume2_name)659 assert len(backups) == 1660 backup = v.inspect_backup(snap1_vol2_bak, endpoint = endpoint)661 assert backup["VolumeName"] == volume2_name662 assert backup["SnapshotName"] == snap1_vol2_name663 #second snapshots664 mount_volume_and_create_file(volume1_name, "test-vol1-v1")665 snap2_vol1_name = v.create_snapshot(volume1_name)666 snap2_vol1_bak = v.create_backup(snap2_vol1_name, dest, endpoint = endpoint)667 mount_volume_and_create_file(volume2_name, "test-vol2-v2")668 snap2_vol2_name = v.create_snapshot(volume2_name)669 snap2_vol2_bak = v.create_backup(snap2_vol2_name, dest, endpoint = endpoint)670 #list snapshots again671 backups = v.list_backup(dest, endpoint = endpoint)672 assert len(backups) == 4673 assert backups[unescape_url(snap1_vol1_bak)]["DriverName"] == driver674 assert backups[unescape_url(snap1_vol1_bak)]["VolumeName"] == volume1_name675 assert backups[unescape_url(snap1_vol1_bak)]["SnapshotName"] == snap1_vol1_name676 assert backups[unescape_url(snap2_vol1_bak)]["DriverName"] == driver677 assert backups[unescape_url(snap2_vol1_bak)]["VolumeName"] == volume1_name678 assert backups[unescape_url(snap2_vol1_bak)]["SnapshotName"] == snap2_vol1_name679 assert backups[unescape_url(snap1_vol2_bak)]["DriverName"] == driver680 assert backups[unescape_url(snap1_vol2_bak)]["VolumeName"] == volume2_name681 assert backups[unescape_url(snap1_vol2_bak)]["SnapshotName"] == snap1_vol2_name682 assert backups[unescape_url(snap2_vol2_bak)]["DriverName"] == driver683 assert backups[unescape_url(snap2_vol2_bak)]["VolumeName"] == volume2_name684 assert backups[unescape_url(snap2_vol2_bak)]["SnapshotName"] == snap2_vol2_name685 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume1_name)686 assert len(backups) == 2687 assert backups[unescape_url(snap1_vol1_bak)]["VolumeName"] == volume1_name688 assert backups[unescape_url(snap1_vol1_bak)]["SnapshotName"] == snap1_vol1_name689 assert backups[unescape_url(snap2_vol1_bak)]["VolumeName"] == volume1_name690 assert backups[unescape_url(snap2_vol1_bak)]["SnapshotName"] == snap2_vol1_name691 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume2_name)692 assert len(backups) == 2693 assert backups[unescape_url(snap1_vol2_bak)]["VolumeName"] == volume2_name694 assert backups[unescape_url(snap1_vol2_bak)]["SnapshotName"] == snap1_vol2_name695 assert backups[unescape_url(snap2_vol2_bak)]["VolumeName"] == volume2_name696 assert backups[unescape_url(snap2_vol2_bak)]["SnapshotName"] == snap2_vol2_name697 #restore snapshot698 res_volume1_name = create_volume(name = "res-vol1", backup = snap2_vol1_bak,699 endpoint = endpoint, driver = driver)700 check_restore(volume1_name, res_volume1_name, driver)701 res_volume2_name = create_volume(backup = snap2_vol2_bak, endpoint=endpoint, driver=driver)702 check_restore(volume2_name, res_volume2_name, driver)703 #remove snapshots from objectstore704 v.delete_backup(snap2_vol1_bak, endpoint = endpoint)705 v.delete_backup(snap2_vol2_bak, endpoint = endpoint)706 #list snapshots again707 backups = v.list_backup(dest, endpoint = endpoint)708 assert len(backups) == 2709 assert backups[unescape_url(snap1_vol1_bak)]["DriverName"] == driver710 assert backups[unescape_url(snap1_vol1_bak)]["VolumeName"] == volume1_name711 assert backups[unescape_url(snap1_vol1_bak)]["SnapshotName"] == snap1_vol1_name712 assert backups[unescape_url(snap1_vol2_bak)]["DriverName"] == driver713 assert backups[unescape_url(snap1_vol2_bak)]["VolumeName"] == volume2_name714 assert backups[unescape_url(snap1_vol2_bak)]["SnapshotName"] == snap1_vol2_name715 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume1_name)716 assert len(backups) == 1717 backup = backups[unescape_url(snap1_vol1_bak)]718 assert backup["DriverName"] == driver719 assert backup["VolumeName"] == volume1_name720 assert backup["SnapshotName"] == snap1_vol1_name721 backup = v.inspect_backup(snap1_vol1_bak, endpoint = endpoint)722 assert backup["DriverName"] == driver723 assert backup["VolumeName"] == volume1_name724 assert backup["SnapshotName"] == snap1_vol1_name725 backups = v.list_backup(dest, endpoint = endpoint, volume_name = volume2_name)726 assert len(backups) == 1727 backup = backups[unescape_url(snap1_vol2_bak)]728 assert backup["DriverName"] == driver729 assert backup["VolumeName"] == volume2_name730 assert backup["SnapshotName"] == snap1_vol2_name731 backup = v.inspect_backup(snap1_vol2_bak, endpoint = endpoint)732 assert backup["DriverName"] == driver733 assert backup["VolumeName"] == volume2_name734 assert backup["SnapshotName"] == snap1_vol2_name735 #remove snapshots from objectstore736 v.delete_backup(snap1_vol2_bak, endpoint = endpoint)737 v.delete_backup(snap1_vol1_bak, endpoint = endpoint)738 v.delete_snapshot(snap1_vol1_name)739 v.delete_snapshot(snap2_vol1_name)740 v.delete_snapshot(snap1_vol2_name)741 v.delete_snapshot(snap2_vol2_name)742 delete_volume(volume1_name)743 delete_volume(volume2_name)744 delete_volume(res_volume1_name)745 delete_volume(res_volume2_name)746def test_cross_restore_error_checking():747 vfs_vol_name = create_volume(driver=VFS)748 vfs_snap_name = v.create_snapshot(vfs_vol_name)749 vfs_backup = v.create_backup(vfs_snap_name, VFS_DEST)750 dm_vol_name = create_volume(size = VOLUME_SIZE_SMALL, driver=DM)751 dm_snap_name = v.create_snapshot(dm_vol_name)752 dm_backup = v.create_backup(dm_snap_name, VFS_DEST)753 with pytest.raises(subprocess.CalledProcessError):754 create_volume(driver=VFS, backup=dm_backup)755 with pytest.raises(subprocess.CalledProcessError):756 create_volume(driver=DM, backup=vfs_backup)757 vfs_res = create_volume(driver=VFS, backup=vfs_backup)758 dm_res = create_volume(driver=DM, backup=dm_backup)759 delete_volume(vfs_vol_name)760 delete_volume(vfs_res)761 delete_volume(dm_vol_name)...

Full Screen

Full Screen

test_volumes.py

Source:test_volumes.py Github

copy

Full Screen

...18from ec2api.tests.functional import config19CONF = config.CONF20class VolumeTest(base.EC2TestCase):21 @decorators.idempotent_id('51fd4d06-7b00-427a-9d69-6ecd076c219a')22 def test_create_delete_volume(self):23 kwargs = {24 'Size': 1,25 'AvailabilityZone': CONF.aws.aws_zone26 }27 data = self.client.create_volume(*[], **kwargs)28 volume_id = data['VolumeId']29 res_clean = self.addResourceCleanUp(self.client.delete_volume,30 VolumeId=volume_id)31 self.get_volume_waiter().wait_available(volume_id)32 if CONF.aws.run_incompatible_tests:33 self.assertEqual('standard', data['VolumeType'])34 self.assertEqual(1, data['Size'])35 if 'Encrypted' in data:36 self.assertFalse(data['Encrypted'])37 self.assertIsNotNone(data['CreateTime'])38 self.assertEqual(CONF.aws.aws_zone, data['AvailabilityZone'])39 self.client.delete_volume(VolumeId=volume_id)40 self.cancelResourceCleanUp(res_clean)41 self.get_volume_waiter().wait_delete(volume_id)42 self.assertRaises('InvalidVolume.NotFound',43 self.client.describe_volumes,44 VolumeIds=[volume_id])45 self.assertRaises('InvalidVolume.NotFound',46 self.client.delete_volume,47 VolumeId=volume_id)48 @decorators.idempotent_id('a7f1c4f8-2153-4d09-b5d5-bf087ea2f6ed')49 @testtools.skipUnless(CONF.aws.run_incompatible_tests,50 "Encryption is not implemented")51 def test_create_encrypted_volume(self):52 kwargs = {53 'Size': 1,54 'AvailabilityZone': CONF.aws.aws_zone,55 'Encrypted': True,56 }57 data = self.client.create_volume(*[], **kwargs)58 volume_id = data['VolumeId']59 res_clean = self.addResourceCleanUp(self.client.delete_volume,60 VolumeId=volume_id)61 self.get_volume_waiter().wait_available(volume_id)62 self.assertTrue(data['Encrypted'])63 self.client.delete_volume(VolumeId=volume_id)64 self.cancelResourceCleanUp(res_clean)65 self.get_volume_waiter().wait_delete(volume_id)66 @decorators.idempotent_id('16c97f73-c4f2-4e91-9506-4f6da4a33f8a')67 def test_describe_volumes(self):68 kwargs = {69 'Size': 1,70 'AvailabilityZone': CONF.aws.aws_zone71 }72 data = self.client.create_volume(*[], **kwargs)73 volume_id = data['VolumeId']74 res_clean = self.addResourceCleanUp(self.client.delete_volume,75 VolumeId=volume_id)76 self.get_volume_waiter().wait_available(volume_id)77 data = self.client.create_volume(*[], **kwargs)78 volume_id_ext = data['VolumeId']79 res_clean_ext = self.addResourceCleanUp(self.client.delete_volume,80 VolumeId=volume_id_ext)81 self.get_volume_waiter().wait_available(volume_id_ext)82 data = self.client.describe_volumes(VolumeIds=[volume_id])83 self.assertEqual(1, len(data['Volumes']))84 volume = data['Volumes'][0]85 self.assertEqual(volume_id, volume['VolumeId'])86 if CONF.aws.run_incompatible_tests:87 self.assertEqual('standard', volume['VolumeType'])88 self.assertEqual(1, volume['Size'])89 if 'Encrypted' in volume:90 self.assertFalse(volume['Encrypted'])91 self.client.delete_volume(VolumeId=volume_id_ext)92 self.cancelResourceCleanUp(res_clean_ext)93 self.get_volume_waiter().wait_delete(volume_id_ext)94 self.client.delete_volume(VolumeId=volume_id)95 self.cancelResourceCleanUp(res_clean)96 self.get_volume_waiter().wait_delete(volume_id)97 @decorators.idempotent_id('30697dd5-12e7-4dd3-8cf8-5bdb296f26d8')98 @testtools.skipUnless(CONF.aws.run_incompatible_tests,99 "Volume statuses are not implemented")100 def test_describe_volume_status(self):101 kwargs = {102 'Size': 1,103 'AvailabilityZone': CONF.aws.aws_zone104 }105 data = self.client.create_volume(*[], **kwargs)106 volume_id = data['VolumeId']107 res_clean = self.addResourceCleanUp(self.client.delete_volume,108 VolumeId=volume_id)109 self.get_volume_waiter().wait_available(volume_id)110 data = self.client.describe_volume_status(VolumeIds=[volume_id])111 self.assertEqual(1, len(data['VolumeStatuses']))112 volume_status = data['VolumeStatuses'][0]113 self.assertIn('Actions', volume_status)114 self.assertIn('Events', volume_status)115 self.assertIn('VolumeStatus', volume_status)116 self.client.delete_volume(VolumeId=volume_id)117 self.cancelResourceCleanUp(res_clean)118 self.get_volume_waiter().wait_delete(volume_id)119 @decorators.idempotent_id('b0116edc-250c-4083-b1ad-66c0eb984415')120 @testtools.skipUnless(CONF.aws.ebs_image_id, "ebs image id is not defined")121 def test_attach_detach_volume(self):122 clean_dict = {}123 instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id,124 clean_dict=clean_dict)125 clean_i = clean_dict['instance']126 kwargs = {127 'Size': 1,128 'AvailabilityZone': CONF.aws.aws_zone129 }130 data = self.client.create_volume(*[], **kwargs)131 volume_id = data['VolumeId']132 clean_v = self.addResourceCleanUp(self.client.delete_volume,133 VolumeId=volume_id)134 self.get_volume_waiter().wait_available(volume_id)135 kwargs = {136 'Device': '/dev/sdh',137 'InstanceId': instance_id,138 'VolumeId': volume_id,139 }140 self.client.attach_volume(*[], **kwargs)141 clean_vi = self.addResourceCleanUp(self.client.detach_volume,142 VolumeId=volume_id)143 self.get_volume_attachment_waiter().wait_available(144 volume_id, final_set=('attached'))145 # reorder cleanups to avoid error on volume delete146 self.cancelResourceCleanUp(clean_i)147 clean_i = self.addResourceCleanUp(self.client.terminate_instances,148 InstanceIds=[instance_id])149 data = self.client.describe_volumes(VolumeIds=[volume_id])150 self.assertEqual(1, len(data['Volumes']))151 volume = data['Volumes'][0]152 self.assertEqual('in-use', volume['State'])153 self.assertEqual(1, len(volume['Attachments']))154 attachment = volume['Attachments'][0]155 self.assertFalse(attachment['DeleteOnTermination'])156 self.assertIsNotNone(attachment['Device'])157 self.assertEqual(instance_id, attachment['InstanceId'])158 self.assertEqual(volume_id, attachment['VolumeId'])159 data = self.client.describe_instances(InstanceIds=[instance_id])160 self.assertEqual(1, len(data.get('Reservations', [])))161 self.assertEqual(1, len(data['Reservations'][0].get('Instances', [])))162 bdms = data['Reservations'][0]['Instances'][0]['BlockDeviceMappings']163 self.assertNotEmpty(bdms)164 self.assertIn('DeviceName', bdms[0])165 self.assertIn('Ebs', bdms[0])166 # stop instance to prevent 'busy' state of detached volume167 data = self.client.stop_instances(InstanceIds=[instance_id])168 self.get_instance_waiter().wait_available(instance_id,169 final_set=('stopped'))170 self.client.detach_volume(VolumeId=volume_id)171 self.get_volume_attachment_waiter().wait_delete(volume_id)172 self.cancelResourceCleanUp(clean_vi)173 data = self.client.describe_volumes(VolumeIds=[volume_id])174 self.assertEqual(1, len(data['Volumes']))175 volume = data['Volumes'][0]176 self.assertEqual('available', volume['State'])177 self.assertEqual(0, len(volume['Attachments']))178 self.client.delete_volume(VolumeId=volume_id)179 self.cancelResourceCleanUp(clean_v)180 self.get_volume_waiter().wait_delete(volume_id)181 self.client.terminate_instances(InstanceIds=[instance_id])182 self.cancelResourceCleanUp(clean_i)183 self.get_instance_waiter().wait_delete(instance_id)184 @decorators.idempotent_id('c4b470b7-0825-418f-bc76-533f84247878')185 @testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")186 def test_attaching_stage(self):187 clean_dict = {}188 instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id,189 clean_dict=clean_dict)190 clean_i = clean_dict['instance']191 data = self.client.create_volume(192 AvailabilityZone=CONF.aws.aws_zone, Size=1)193 volume_id = data['VolumeId']194 clean_v = self.addResourceCleanUp(self.client.delete_volume,195 VolumeId=volume_id)196 self.get_volume_waiter().wait_available(volume_id)197 device_name = '/dev/xvdh'198 kwargs = {199 'Device': device_name,200 'InstanceId': instance_id,201 'VolumeId': volume_id,202 }203 data = self.client.attach_volume(*[], **kwargs)204 clean_vi = self.addResourceCleanUp(self.client.detach_volume,205 VolumeId=volume_id)206 self.assertEqual('attaching', data['State'])207 if CONF.aws.run_incompatible_tests:208 bdt = self.get_instance_bdm(instance_id, device_name)209 self.assertIsNotNone(bdt)210 self.assertEqual('attaching', bdt['Ebs']['Status'])211 self.get_volume_attachment_waiter().wait_available(212 volume_id, final_set=('attached'))213 # reorder cleanups to avoid error on volume delete214 self.cancelResourceCleanUp(clean_i)215 clean_i = self.addResourceCleanUp(self.client.terminate_instances,216 InstanceIds=[instance_id])217 # stop instance to prevent 'busy' state of detached volume218 data = self.client.stop_instances(InstanceIds=[instance_id])219 self.get_instance_waiter().wait_available(instance_id,220 final_set=('stopped'))221 self.client.detach_volume(VolumeId=volume_id)222 self.get_volume_attachment_waiter().wait_delete(volume_id)223 self.cancelResourceCleanUp(clean_vi)224 self.client.delete_volume(VolumeId=volume_id)225 self.cancelResourceCleanUp(clean_v)226 self.get_volume_waiter().wait_delete(volume_id)227 self.client.terminate_instances(InstanceIds=[instance_id])228 self.cancelResourceCleanUp(clean_i)229 self.get_instance_waiter().wait_delete(instance_id)230 @decorators.idempotent_id('949ced35-fb66-4e87-afd8-f64de3dd20e9')231 @testtools.skipUnless(CONF.aws.run_incompatible_tests,232 "Volume statuses are not implemented")233 @testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")234 def test_delete_detach_attached_volume(self):235 instance_id = self.run_instance(ImageId=CONF.aws.ebs_image_id)236 kwargs = {237 'Size': 1,238 'AvailabilityZone': CONF.aws.aws_zone239 }240 data = self.client.create_volume(*[], **kwargs)241 volume_id = data['VolumeId']242 clean_v = self.addResourceCleanUp(self.client.delete_volume,243 VolumeId=volume_id)244 self.get_volume_waiter().wait_available(volume_id)245 kwargs = {246 'Device': '/dev/sdh',247 'InstanceId': instance_id,248 'VolumeId': volume_id,249 }250 self.client.attach_volume(*[], **kwargs)251 clean_vi = self.addResourceCleanUp(self.client.detach_volume,252 VolumeId=volume_id)253 self.get_volume_attachment_waiter().wait_available(254 volume_id, final_set=('attached'))255 self.assertRaises('VolumeInUse',256 self.client.attach_volume,257 **kwargs)258 kwargs['Device'] = '/dev/sdi'259 self.assertRaises('VolumeInUse',260 self.client.attach_volume,261 **kwargs)262 self.assertRaises('VolumeInUse',263 self.client.delete_volume,264 VolumeId=volume_id)265 # stop instance to prevent 'busy' state of detached volume266 data = self.client.stop_instances(InstanceIds=[instance_id])267 self.get_instance_waiter().wait_available(instance_id,268 final_set=('stopped'))269 self.client.detach_volume(VolumeId=volume_id)270 self.cancelResourceCleanUp(clean_vi)271 self.get_volume_attachment_waiter().wait_delete(volume_id)272 self.assertRaises('IncorrectState',273 self.client.detach_volume,274 VolumeId=volume_id)275 self.client.delete_volume(VolumeId=volume_id)276 self.cancelResourceCleanUp(clean_v)277 self.get_volume_waiter().wait_delete(volume_id)278 self.assertRaises('InvalidVolume.NotFound',279 self.client.detach_volume,280 VolumeId=volume_id)281 self.client.terminate_instances(InstanceIds=[instance_id])282 self.get_instance_waiter().wait_delete(instance_id)283 @decorators.idempotent_id('c37b01f7-5b27-4773-9278-9e0b8eaccb5f')284 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")285 def test_volume_auto_termination_swithed_off(self):286 instance_id = self.run_instance()287 kwargs = {288 'Size': 1,289 'AvailabilityZone': CONF.aws.aws_zone290 }291 data = self.client.create_volume(*[], **kwargs)292 volume_id = data['VolumeId']293 clean_v = self.addResourceCleanUp(self.client.delete_volume,294 VolumeId=volume_id)295 self.get_volume_waiter().wait_available(volume_id)296 kwargs = {297 'Device': '/dev/sdh',298 'InstanceId': instance_id,299 'VolumeId': volume_id,300 }301 self.client.attach_volume(*[], **kwargs)302 self.addResourceCleanUp(self.client.detach_volume, VolumeId=volume_id)303 self.get_volume_attachment_waiter().wait_available(304 volume_id, final_set=('attached'))305 self.client.terminate_instances(InstanceIds=[instance_id])306 self.get_instance_waiter().wait_delete(instance_id)307 data = self.client.describe_volumes(VolumeIds=[volume_id])308 self.assertEqual(1, len(data['Volumes']))309 volume = data['Volumes'][0]310 self.assertEqual('available', volume['State'])311 if 'Attachments' in volume:312 self.assertEqual(0, len(volume['Attachments']))313 self.client.delete_volume(VolumeId=volume_id)314 self.cancelResourceCleanUp(clean_v)315 self.get_volume_waiter().wait_delete(volume_id)316 @decorators.idempotent_id('c8649cab-e1f4-42f7-9578-8e72d06534ba')317 @testtools.skipUnless(CONF.aws.run_incompatible_tests,318 "modify_instance_attribute is not implemented")319 @testtools.skipUnless(CONF.aws.image_id, "image id is not defined")320 def test_volume_auto_termination_swithed_on(self):321 instance_id = self.run_instance()322 kwargs = {323 'Size': 1,324 'AvailabilityZone': CONF.aws.aws_zone325 }326 data = self.client.create_volume(*[], **kwargs)327 volume_id = data['VolumeId']...

Full Screen

Full Screen

test_volume.py

Source:test_volume.py Github

copy

Full Screen

...18"""19from basetest import BaseTest20import siolib21class Test_Volume(BaseTest):22 def test_create_and_delete_volume(self):23 volume_name = self._random_name()24 self.scaleio.create_volume(volume_name,25 self.domain,26 self.pool,27 provisioning_type='thin')28 volume_id = self.scaleio.get_volumeid(volume_name)29 self.assertEqual(volume_name, self.scaleio.get_volumename(volume_id))30 self.scaleio.delete_volume(volume_name, True)31 self.assertRaises(siolib.VolumeNotFound,32 self.scaleio.get_volumename, volume_name)33 def test_resize_volume(self):34 volume_name = self._random_name()35 # create a volume, and check the size36 self.scaleio.create_volume(volume_name,37 self.domain,38 self.pool,39 provisioning_type='thin',40 volume_size_gb=self.initial_vol_size)41 initial_size = self.scaleio.get_volumesize(volume_name)42 self.assertEqual(initial_size, self.initial_vol_size*1024*1024)43 # resize it44 self.scaleio.extend_volume(volume_name, self.extend_vol_size)45 new_size = self.scaleio.get_volumesize(volume_name)46 self.assertEqual(new_size, self.extend_vol_size*1024*1024)47 # delete it48 self.scaleio.delete_volume(volume_name, True)49 def test_rename_volume(self):50 volume_name_1 = self._random_name()51 volume_name_2 = self._random_name()52 # create a volume53 self.scaleio.create_volume(volume_name_1,54 self.domain,55 self.pool,56 provisioning_type='thin',57 volume_size_gb=self.initial_vol_size)58 # get the volume id59 volume_id = self.scaleio.get_volumeid(volume_name_1)60 # rename it61 self.scaleio.rename_volume(volume_name_1, volume_name_2)62 # make sure it got renamed63 self.assertEqual(volume_name_2, self.scaleio.get_volumename(volume_id))64 # make sure old volume name is no longer valid65 self.assertRaises(siolib.VolumeNotFound,66 self.scaleio.get_volumename,67 volume_name_1)68 # delete the volume69 self.scaleio.delete_volume(volume_id, True)70 def test_snapshot(self):71 volume_name = self._random_name()72 snapshot_name = volume_name + "snap"73 # create a volume74 self.scaleio.create_volume(volume_name,75 self.domain,76 self.pool,77 provisioning_type='thin')78 # snap it79 self.scaleio.snapshot_volume(volume_name, snapshot_name)80 snapshot_id = self.scaleio.get_volumeid(snapshot_name)81 self.assertEqual(snapshot_name, self.scaleio.get_volumename(snapshot_id))82 # delete the volume83 self.scaleio.delete_volume(volume_name)84 self.assertRaises(siolib.VolumeNotFound,85 self.scaleio.get_volumename,86 volume_name)87 # delete the snapshot88 self.scaleio.delete_volume(snapshot_name)89 self.assertRaises(siolib.VolumeNotFound,90 self.scaleio.get_volumename,91 snapshot_name)92 def test_snapshot_defs(self):93 volume = []94 snapshot = []95 num_volumes = 596 # create the volumes97 for i in range(0, num_volumes):98 vol_name = self._random_name()99 # create the volumes100 vol_id, name = self.scaleio.create_volume(vol_name,101 self.domain,102 self.pool,103 provisioning_type='thin')104 self.assertEqual(name, vol_name)105 volume.append(vol_id)106 snapshot.append(vol_name + "-snap")107 get_scaleio_snapshot_params = lambda src_volume, trg_volume: {108 'volumeId': src_volume,109 'snapshotName': trg_volume}110 # build the snapshotDefs111 snapshot_defs = list(map(get_scaleio_snapshot_params,112 volume,113 snapshot))114 # snap them115 response = self.scaleio.snapshot_volume_from_defs(snapshot_defs)116 self.assertEqual(num_volumes, len(response['volumeIdList']))117 # delete the volumes and snapshots118 for i in range(0, num_volumes):119 self.scaleio.delete_volume(volume[i])120 self.scaleio.delete_volume(snapshot[i])121 def test_delete_multiple_modes(self):122 volume_name = self._random_name()123 snapshot_name = self._random_name()124 # create the volume125 self.scaleio.create_volume(volume_name,126 self.domain,127 self.pool,128 provisioning_type='thin',129 volume_size_gb=self.initial_vol_size)130 # snap the volume131 self.scaleio.snapshot_volume(volume_name, snapshot_name)132 # try deleting with two remove modes, invalid request133 self.assertRaises(ValueError,134 self.scaleio.delete_volume,135 volume_name,136 include_descendents=True,137 only_descendents=True)138 # delete the whole vtree139 self.scaleio.delete_volume(volume_name, vtree=True)140 # make sure no volumes or snaps exist141 self.assertRaises(siolib.VolumeNotFound,142 self.scaleio.get_volumename,143 volume_name)144 self.assertRaises(siolib.VolumeNotFound,145 self.scaleio.get_volumename,146 snapshot_name)147 def test_delete_only_descendants(self):148 volume_name = self._random_name()149 snapshot_name = self._random_name()150 # create the volume151 self.scaleio.create_volume(volume_name,152 self.domain,153 self.pool,154 provisioning_type='thin',155 volume_size_gb=self.initial_vol_size)156 # snap the volume157 self.scaleio.snapshot_volume(volume_name, snapshot_name)158 # delete just the descendants159 self.scaleio.delete_volume(volume_name,160 only_descendents=True)161 # the snapshot should be gone162 self.assertRaises(siolib.VolumeNotFound,163 self.scaleio.get_volumename,164 snapshot_name)165 # the volume should still exist166 self.scaleio.get_volumeid(volume_name)167 # delete the volume168 self.scaleio.delete_volume(volume_name, vtree=True)169 # make sure no volumes exist170 self.assertRaises(siolib.VolumeNotFound,171 self.scaleio.get_volumename,172 volume_name)173 def test_delete_include_descendants(self):174 volume_name = self._random_name()175 snapshot_name = self._random_name()176 # create the volume177 self.scaleio.create_volume(volume_name,178 self.domain,179 self.pool,180 provisioning_type='thin',181 volume_size_gb=self.initial_vol_size)182 # snap the volume183 self.scaleio.snapshot_volume(volume_name, snapshot_name)184 # delete just the descendants185 self.scaleio.delete_volume(volume_name,186 include_descendents=True)187 # make sure no volumes or snaps exist188 self.assertRaises(siolib.VolumeNotFound,189 self.scaleio.get_volumename,190 volume_name)191 self.assertRaises(siolib.VolumeNotFound,192 self.scaleio.get_volumename,193 snapshot_name)194 def test_delete_vtree(self):195 volume_name = self._random_name()196 snapshot_name = self._random_name()197 # create the volume198 self.scaleio.create_volume(volume_name,199 self.domain,200 self.pool,201 provisioning_type='thin',202 volume_size_gb=self.initial_vol_size)203 # snap the volume204 self.scaleio.snapshot_volume(volume_name, snapshot_name)205 # delete the snapshot but specify the whole vtree (will also delete parents)206 self.scaleio.delete_volume(snapshot_name,207 vtree=True)208 # make sure no volumes or snaps exist209 self.assertRaises(siolib.VolumeNotFound,210 self.scaleio.get_volumename,211 volume_name)212 self.assertRaises(siolib.VolumeNotFound,213 self.scaleio.get_volumename,214 snapshot_name)215 def test_delete_only_me(self):216 volume_name = self._random_name()217 snapshot_name = self._random_name()218 # create the volume219 self.scaleio.create_volume(volume_name,220 self.domain,221 self.pool,222 provisioning_type='thin',223 volume_size_gb=self.initial_vol_size)224 # snap the volume225 self.scaleio.snapshot_volume(volume_name, snapshot_name)226 # delete just the volume227 self.scaleio.delete_volume(volume_name)228 # make sure the volumes does not exist229 self.assertRaises(siolib.VolumeNotFound,230 self.scaleio.get_volumename,231 volume_name)232 # but the snapshot will233 self.scaleio.get_volumeid(snapshot_name)234 self.scaleio.delete_volume(snapshot_name)235 self.assertRaises(siolib.VolumeNotFound,236 self.scaleio.get_volumename,237 snapshot_name)238 def test_volume_properties(self):239 volume_name = self._random_name()240 # create the volume241 vol_id, name = self.scaleio.create_volume(volume_name,242 self.domain,243 self.pool,244 provisioning_type='thin',245 volume_size_gb=self.initial_vol_size)246 props = self.scaleio.get_volume_properties(volume_name)247 self.assertEqual(props['id'], vol_id)248 # delete the volume249 self.scaleio.delete_volume(volume_name)250 # make sure the volumes does not exist251 self.assertRaises(siolib.VolumeNotFound,252 self.scaleio.get_volumename,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful