How to use delete_snapshot method in localstack

Best Python code snippet using localstack_python

test_bsu_backup.py

Source:test_bsu_backup.py Github

copy

Full Screen

1import unittest2import logging3from unittest.mock import patch4import boto35import botocore.session6from botocore.stub import Stubber7import json8import os9import osc_bsu_backup.bsu_backup as bsu10from osc_bsu_backup.error import InputError11import tests.unit.fixtures_bsu_backup as fixtures12from datetime import datetime, timezone13class TestBsuBackup(unittest.TestCase):14 def setUp(self):15 self.ec2 = botocore.session.get_session().create_client(16 "ec2", region_name="eu-west-2"17 )18 logging.disable(logging.CRITICAL)19 def tearDown(self):20 logging.disable(logging.NOTSET)21 def test_auth1(self):22 with patch("boto3.Session"):23 self.assertTrue(24 bsu.auth(25 region="eu-west-2",26 profile="default",27 client_cert=None,28 endpoint=None,29 )30 )31 self.assertRaises(InputError, bsu.auth, "eu-west-3", "default", None)32 def test_auth2(self):33 self.assertRaises(InputError, bsu.auth, "eu-west-3", "default", None)34 def test_find_instance_by_id1(self):35 with Stubber(self.ec2) as stubber:36 stubber.add_response(37 "describe_instances",38 fixtures.instances,39 {"InstanceIds": ["i-e6b7ab04"]},40 )41 for i in bsu.find_instance_by_id(self.ec2, "i-e6b7ab04"):42 self.assertEqual(i, "vol-a87f91c1")43 def test_find_instance_by_id2(self):44 with Stubber(self.ec2) as stubber:45 stubber.add_response(46 "describe_instances", {}, {"InstanceIds": ["i-e6b7ab05"]}47 )48 for i in bsu.find_instance_by_id(self.ec2, "i-e6b7ab05"):49 self.assertEqual(i, None)50 def test_find_instances_by_tags1(self):51 with Stubber(self.ec2) as stubber:52 stubber.add_response(53 "describe_instances",54 fixtures.instances,55 {56 "Filters": [57 {58 "Name": "instance-state-name",59 "Values": ["running", "stopped"],60 },61 {"Name": "tag:Name", "Values": ["test1"]},62 ]63 },64 )65 for i in bsu.find_instances_by_tags(self.ec2, ["Name:test1"]):66 self.assertEqual(i, "vol-a87f91c1")67 def test_find_volume_by_id1(self):68 with Stubber(self.ec2) as stubber:69 stubber.add_response(70 "describe_volumes", fixtures.volumes, {"VolumeIds": ["vol-a24fffdc"]}71 )72 for i in bsu.find_volume_by_id(self.ec2, "vol-a24fffdc"):73 self.assertEqual(i, "vol-a24fffdc")74 def test_find_volume_by_id2(self):75 with Stubber(self.ec2) as stubber:76 stubber.add_response(77 "describe_volumes", {"Volumes": []}, {"VolumeIds": ["vol-a24fffda"]}78 )79 for i in bsu.find_volume_by_id(self.ec2, "vol-a24fffda"):80 assertEqual(i, None)81 def test_find_volumes_by_tags1(self):82 with Stubber(self.ec2) as stubber:83 stubber.add_response(84 "describe_volumes",85 fixtures.volumes,86 {"Filters": [{"Name": "tag:Name", "Values": ["test1"]}]},87 )88 for i in bsu.find_volumes_by_tags(self.ec2, ["Name:test1"]):89 self.assertEqual(i, "vol-a24fffdc")90 def test_find_volumes_by_tags2(self):91 with Stubber(self.ec2) as stubber:92 stubber.add_response(93 "describe_volumes",94 {"Volumes": []},95 {"Filters": [{"Name": "tag:Name", "Values": ["abc"]}]},96 )97 for i in bsu.find_volumes_by_tags(self.ec2, ["Name:abc"]):98 self.assertEqual(i, None)99 def test_rotate_snapshots1(self):100 with Stubber(self.ec2) as stubber:101 stubber.add_response(102 "describe_snapshots",103 fixtures.snapshots1,104 {105 "Filters": [106 {"Name": "volume-id", "Values": ["vol-59b94d63"]},107 {108 "Name": "description",109 "Values": [110 "osc-bsu-backup EF50CF3A80164A5EABAF8C78B2314C65",111 "osc-bsu-backup 0.1",112 "osc-bsu-backup 0.0.2",113 "osc-bsu-backup 0.0.1",114 ],115 },116 ]117 },118 )119 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-d5caf847"})120 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-30e1c236"})121 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-cf4748a5"})122 stubber.add_response(123 "describe_snapshots",124 fixtures.snapshots2,125 {126 "Filters": [127 {"Name": "volume-id", "Values": ["vol-640141cf"]},128 {129 "Name": "description",130 "Values": [131 "osc-bsu-backup EF50CF3A80164A5EABAF8C78B2314C65",132 "osc-bsu-backup 0.1",133 "osc-bsu-backup 0.0.2",134 "osc-bsu-backup 0.0.1",135 ],136 },137 ]138 },139 )140 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-9c3c5d34"})141 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-fa25ee50"})142 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-e6996c10"})143 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-8f3436c0"})144 self.assertEqual(145 bsu.rotate_snapshots(146 self.ec2, ["vol-59b94d63", "vol-640141cf"], 8, True147 ),148 None,149 )150 def test_rotate_snapshots2(self):151 with Stubber(self.ec2) as stubber:152 stubber.add_response(153 "describe_snapshots",154 fixtures.snapshots1,155 {156 "Filters": [157 {"Name": "volume-id", "Values": ["vol-59b94d63"]},158 ]159 },160 )161 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-30e1c236"})162 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-cf4748a5"})163 self.assertEqual(bsu.rotate_snapshots(self.ec2, ["vol-59b94d63"], 9), None)164 def test_rotate_snapshots3(self):165 with Stubber(self.ec2) as stubber:166 stubber.add_response(167 "describe_snapshots",168 fixtures.snapshots1,169 {170 "Filters": [171 {"Name": "volume-id", "Values": ["vol-59b94d63"]},172 ]173 },174 )175 self.assertEqual(bsu.rotate_snapshots(self.ec2, ["vol-59b94d63"], 12), None)176 def test_rotate_snapshots4(self):177 with Stubber(self.ec2) as stubber:178 stubber.add_response(179 "describe_snapshots",180 fixtures.snapshots1,181 {182 "Filters": [183 {"Name": "volume-id", "Values": ["vol-59b94d63"]},184 ]185 },186 )187 self.assertEqual(188 bsu.rotate_snapshots(self.ec2, ["vol-59b94d63"], -5487812), None189 )190 def test_rotate_snapshots5(self):191 with Stubber(self.ec2) as stubber:192 stubber.add_response(193 "describe_snapshots",194 fixtures.snapshots1,195 {196 "Filters": [197 {"Name": "volume-id", "Values": ["vol-59b94d63"]},198 ]199 },200 )201 self.assertEqual(202 bsu.rotate_snapshots(self.ec2, ["vol-59b94d63"], 5487812), None203 )204 def test_rotate_snapshots6(self):205 with Stubber(self.ec2) as stubber:206 stubber.add_response(207 "describe_snapshots",208 {"Snapshots": []},209 {210 "Filters": [211 {"Name": "volume-id", "Values": ["aaaa"]},212 ]213 },214 )215 self.assertEqual(bsu.rotate_snapshots(self.ec2, ["aaaa"], 14), None)216 def test_rotate_days_snapshots1(self):217 with Stubber(self.ec2) as stubber:218 stubber.add_response(219 "describe_snapshots",220 fixtures.snapshots1,221 {222 "Filters": [223 {"Name": "volume-id", "Values": ["vol-59b94d63"]},224 ]225 },226 )227 stubber.add_response("delete_snapshot", {}, {"SnapshotId": "snap-cf4748a5"})228 stubber.add_response(229 "describe_snapshots",230 fixtures.snapshots2,231 {232 "Filters": [233 {"Name": "volume-id", "Values": ["vol-640141cf"]},234 ]235 },236 )237 with patch("osc_bsu_backup.bsu_backup.datetime") as mock_date:238 mock_date.now.return_value = datetime(239 year=2019, month=12, day=20, tzinfo=timezone.utc240 )241 self.assertEqual(242 bsu.rotate_days_snapshots(243 self.ec2, ["vol-59b94d63", "vol-640141cf"], 2, False244 ),245 None,246 )247 def test_create_snapshots1(self):248 with Stubber(self.ec2) as stubber:249 stubber.add_response(250 "create_snapshot",251 fixtures.create_snapshot1,252 {253 "Description": "osc-bsu-backup EF50CF3A80164A5EABAF8C78B2314C65",254 "VolumeId": "vol-56d30e10",255 },256 )257 stubber.add_response(258 "describe_snapshots",259 fixtures.snapshot_completed1,260 {"SnapshotIds": ["snap-91c8d227"]},261 )262 self.assertEqual(bsu.create_snapshots(self.ec2, ["vol-56d30e10"]), None)263 def test_create_snapshots2(self):264 with Stubber(self.ec2) as stubber:265 stubber.add_response(266 "create_snapshot",267 fixtures.create_snapshot1,268 {269 "Description": "osc-bsu-backup EF50CF3A80164A5EABAF8C78B2314C65",270 "VolumeId": "vol-56d30e10",271 },272 )273 stubber.add_response(274 "create_snapshot",275 fixtures.create_snapshot2,276 {277 "Description": "osc-bsu-backup EF50CF3A80164A5EABAF8C78B2314C65",278 "VolumeId": "vol-640141cf",279 },280 )281 stubber.add_response(282 "describe_snapshots",283 fixtures.snapshot_completed2,284 {"SnapshotIds": ["snap-91c8d227", "snap-f3338f81"]},285 )286 self.assertEqual(287 bsu.create_snapshots(self.ec2, ["vol-56d30e10", "vol-640141cf"]), None288 )289if __name__ == "__main__":...

Full Screen

Full Screen

test_clean.py

Source:test_clean.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2#3# Copyright 2016 Rackspace US, Inc.4#5# Licensed to the Apache Software Foundation (ASF) under one6# or more contributor license agreements. See the NOTICE file7# distributed with this work for additional information8# regarding copyright ownership. The ASF licenses this file9# to you under the Apache License, Version 2.0 (the10# "License"); you may not use this file except in compliance11# with the License. You may obtain a copy of the License at12#13# http://www.apache.org/licenses/LICENSE-2.014#15# Unless required by applicable law or agreed to in writing,16# software distributed under the License is distributed on an17# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY18# KIND, either express or implied. See the License for the19# specific language governing permissions and limitations20# under the License.21#22"""Module for testing clean module."""23import json24import datetime25import boto326from moto import mock_ec2, mock_sns, mock_dynamodb2, mock_iam, mock_sts27from ebs_snapper import clean, utils, mocks, dynamo28from ebs_snapper import AWS_MOCK_ACCOUNT29import dateutil30def setup_module(module):31 import logging32 logging.getLogger('botocore').setLevel(logging.WARNING)33 logging.getLogger('boto3').setLevel(logging.WARNING)34 logging.basicConfig(level=logging.INFO)35@mock_ec236@mock_sns37@mock_dynamodb238@mock_iam39@mock_sts40def test_perform_fanout_all_regions_clean(mocker):41 """Test for method of the same name."""42 mocks.create_sns_topic('CleanSnapshotTopic')43 mocks.create_dynamodb()44 expected_regions = utils.get_regions()45 for r in expected_regions: # must have an instance in the region to clean it46 mocks.create_instances(region=r)47 expected_sns_topic = utils.get_topic_arn('CleanSnapshotTopic', 'us-east-1')48 ctx = utils.MockContext()49 mocker.patch('ebs_snapper.clean.send_fanout_message')50 # fan out, and be sure we touched every region51 clean.perform_fanout_all_regions(ctx)52 for r in expected_regions:53 clean.send_fanout_message.assert_any_call( # pylint: disable=E110354 ctx,55 cli=False,56 region=r,57 topic_arn=expected_sns_topic)58@mock_ec259@mock_sns60@mock_iam61@mock_sts62def test_send_fanout_message_clean(mocker):63 """Test for method of the same name."""64 mocks.create_sns_topic('testing-topic')65 expected_sns_topic = utils.get_topic_arn('testing-topic', 'us-east-1')66 ctx = utils.MockContext()67 mocker.patch('ebs_snapper.utils.sns_publish')68 clean.send_fanout_message(ctx, region='us-west-2', topic_arn=expected_sns_topic)69 utils.sns_publish.assert_any_call( # pylint: disable=E110370 TopicArn=expected_sns_topic,71 Message=json.dumps({'region': 'us-west-2'}))72@mock_ec273@mock_dynamodb274@mock_iam75@mock_sts76def test_clean_tagged_snapshots(mocker):77 """Test for method of the same name."""78 # default settings79 region = 'us-east-1'80 mocks.create_dynamodb(region)81 # create an instance and record the id82 instance_id = mocks.create_instances(region, count=1)[0]83 ctx = utils.MockContext()84 # setup the min # snaps for the instance85 config_data = {86 "match": {"instance-id": instance_id},87 "snapshot": {88 "retention": "6 days", "minimum": 0, "frequency": "13 hours"89 }90 }91 # put it in the table, be sure it succeeded92 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)93 # figure out the EBS volume that came with our instance94 volume_id = utils.get_volumes([instance_id], region)[0]['VolumeId']95 # make a snapshot that should be deleted today too96 now = datetime.datetime.now(dateutil.tz.tzutc())97 delete_on = now.strftime('%Y-%m-%d')98 utils.snapshot_and_tag(instance_id, 'ami-123abc', volume_id, delete_on, region)99 snapshot_id = utils.most_recent_snapshot(volume_id, region)['SnapshotId']100 mocker.patch('ebs_snapper.utils.delete_snapshot')101 clean.clean_snapshot(ctx, region)102 # ensure we deleted this snapshot if it was ready to die today103 utils.delete_snapshot.assert_any_call(snapshot_id, region) # pylint: disable=E1103104 # now raise the minimum, and check to be sure we didn't delete105 utils.delete_snapshot.reset_mock() # pylint: disable=E1103106 config_data['snapshot']['minimum'] = 5107 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)108 clean.clean_snapshot(ctx, region)109 utils.delete_snapshot.assert_not_called() # pylint: disable=E1103110@mock_ec2111@mock_dynamodb2112@mock_iam113@mock_sts114def test_clean_tagged_snapshots_ignore_retention(mocker):115 """Test for method of the same name."""116 # default settings117 region = 'us-east-1'118 mocks.create_dynamodb(region)119 # create two instances and record the id of one of them120 instance_id_list = mocks.create_instances(region, count=2)121 instance_id = instance_id_list[0]122 ctx = utils.MockContext()123 # setup the min # snaps for the instance124 config_data = {125 "match": {"instance-id": instance_id},126 "snapshot": {127 "retention": "6 days", "minimum": 5, "frequency": "13 hours"128 },129 "ignore_retention": True130 }131 # put it in the table, be sure it succeeded132 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)133 # figure out the EBS volume that came with our instance134 volume_id = utils.get_volumes([instance_id], region)[0]['VolumeId']135 # make a snapshot that should be deleted today (but 5 snap retention)136 now = datetime.datetime.now(dateutil.tz.tzutc())137 delete_on = now.strftime('%Y-%m-%d')138 utils.snapshot_and_tag(instance_id, 'ami-123abc', volume_id, delete_on, region)139 snapshot_id = utils.most_recent_snapshot(volume_id, region)['SnapshotId']140 # now delete the instance and volume, keep the snapshot141 client = boto3.client('ec2', region_name=region)142 client.terminate_instances(InstanceIds=[instance_id])143 mocker.patch('ebs_snapper.utils.delete_snapshot')144 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)145 clean.clean_snapshot(ctx, region)146 utils.delete_snapshot.assert_any_call(snapshot_id, region) # pylint: disable=E1103147@mock_ec2148@mock_dynamodb2149@mock_iam150@mock_sts151def test_clean_snapshots_tagged_timeout(mocker):152 """Test that we _DONT_ clean anything if runtime > 4 minutes"""153 # default settings154 region = 'us-east-1'155 mocks.create_dynamodb(region)156 ctx = utils.MockContext()157 ctx.set_remaining_time_in_millis(5) # 5 millis remaining158 # create an instance and record the id159 instance_id = mocks.create_instances(region, count=1)[0]160 # setup the min # snaps for the instance161 config_data = {162 "match": {"instance-id": instance_id},163 "snapshot": {164 "retention": "6 days", "minimum": 0, "frequency": "13 hours"165 }166 }167 # put it in the table, be sure it succeeded168 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)169 # figure out the EBS volume that came with our instance170 volume_id = utils.get_volumes([instance_id], region)[0]['VolumeId']171 # make a snapshot that should be deleted today too172 now = datetime.datetime.now(dateutil.tz.tzutc())173 delete_on = now.strftime('%Y-%m-%d')174 utils.snapshot_and_tag(instance_id, 'ami-123abc', volume_id, delete_on, region)175 mocker.patch('ebs_snapper.utils.delete_snapshot')176 clean.clean_snapshot(ctx, region)177 # ensure we DO NOT take a snapshot if our runtime was 5 minutes178 assert not utils.delete_snapshot.called179@mock_ec2180@mock_dynamodb2181@mock_iam182@mock_sts183def test_clean_tagged_snapshots_ignore_instance(mocker):184 """Test for method of the same name."""185 # default settings186 region = 'us-east-1'187 mocks.create_dynamodb(region)188 # create an instance and record the id189 instance_id = mocks.create_instances(region, count=1)[0]190 ctx = utils.MockContext()191 # setup the min # snaps for the instance192 config_data = {193 "match": {"instance-id": instance_id},194 "snapshot": {195 "retention": "6 days", "minimum": 0, "frequency": "13 hours"196 },197 "ignore": [instance_id]198 }199 # put it in the table, be sure it succeeded200 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)201 # figure out the EBS volume that came with our instance202 volume_id = utils.get_volumes([instance_id], region)[0]['VolumeId']203 # make a snapshot that should be deleted today too204 now = datetime.datetime.now(dateutil.tz.tzutc())205 delete_on = now.strftime('%Y-%m-%d')206 utils.snapshot_and_tag(instance_id, 'ami-123abc', volume_id, delete_on, region)207 utils.most_recent_snapshot(volume_id, region)['SnapshotId']208 mocker.patch('ebs_snapper.utils.delete_snapshot')209 clean.clean_snapshot(ctx, region)210 # ensure we ignored the instance from this volume211 utils.delete_snapshot.assert_not_called() # pylint: disable=E1103212@mock_ec2213@mock_dynamodb2214@mock_iam215@mock_sts216def test_clean_tagged_snapshots_ignore_volume(mocker):217 """Test for method of the same name."""218 # default settings219 region = 'us-east-1'220 mocks.create_dynamodb(region)221 # create an instance and record the id222 instance_id = mocks.create_instances(region, count=1)[0]223 ctx = utils.MockContext()224 # setup the min # snaps for the instance225 config_data = {226 "match": {"instance-id": instance_id},227 "snapshot": {228 "retention": "6 days", "minimum": 0, "frequency": "13 hours"229 },230 "ignore": []231 }232 # put it in the table, be sure it succeeded233 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)234 # figure out the EBS volume that came with our instance235 volume_id = utils.get_volumes([instance_id], region)[0]['VolumeId']236 config_data["ignore"].append(volume_id)237 # make a snapshot that should be deleted today too238 now = datetime.datetime.now(dateutil.tz.tzutc())239 delete_on = now.strftime('%Y-%m-%d')240 utils.snapshot_and_tag(instance_id, 'ami-123abc', volume_id, delete_on, region)241 snapshot_id = utils.most_recent_snapshot(volume_id, region)['SnapshotId']242 mocker.patch('ebs_snapper.utils.delete_snapshot')243 clean.clean_snapshot(ctx, region)244 # ensure we deleted this snapshot if it was ready to die today245 utils.delete_snapshot.assert_any_call(snapshot_id, region) # pylint: disable=E1103246 # now raise the minimum, and check to be sure we didn't delete247 utils.delete_snapshot.reset_mock() # pylint: disable=E1103248 config_data['snapshot']['minimum'] = 5249 dynamo.store_configuration(region, 'foo', AWS_MOCK_ACCOUNT, config_data)250 clean.clean_snapshot(ctx, region)...

Full Screen

Full Screen

snapshot_cleanup.py

Source:snapshot_cleanup.py Github

copy

Full Screen

...34 try:35 tags_list = snap['Tags']36 except:37 print "Snapshot has no tags, deleting %s" % snap['SnapshotId']38 delete_snapshot(snap['SnapshotId'], client)39 continue40 for tag in tags_list:41 if ((tag['Key']).lower() == 'preserve' and (tag['Value']).lower() == 'false'):42 print "I'm deleting this snapshots %s" % snap['SnapshotId']43 delete_snapshot(snap['SnapshotId'], client)44 found = True45 elif ((tag['Key']).lower() == 'preserve' and (tag['Value']).lower() == 'true'):46 print "I'm keeping this snapshot %s" % snap['SnapshotId']47 if found == False:48 print "I don't have a preserve tag %s" % snap['SnapshotId']49 delete_snapshot(snap['SnapshotId'], client)50# This function deletes the snapshots51def delete_snapshot(snap, client):52 print "Deleting snapshot %s" % snap53 try:54 client.delete_snapshot(SnapshotId=snap)55 except ClientError, e:56 print str(e)57 print "Failed deleting snapshot %s" % snap58def main():59 list_snapshots()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful