Best Python code snippet using localstack_python
test_kms.py
Source:test_kms.py
1# Copyright 2016-2017 Capital One Services, LLC2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from __future__ import absolute_import, division, print_function, unicode_literals15import json, time16from .common import BaseTest, functional17class KMSTest(BaseTest):18 def test_kms_grant(self):19 session_factory = self.replay_flight_data("test_kms_grants")20 p = self.load_policy(21 {22 "name": "kms-grant-count",23 "resource": "kms",24 "filters": [{"type": "grant-count"}],25 },26 session_factory=session_factory,27 )28 resources = p.run()29 self.assertEqual(len(resources), 0)30 def test_key_rotation(self):31 session_factory = self.replay_flight_data("test_key_rotation")32 p = self.load_policy(33 {34 "name": "kms-key-rotation",35 "resource": "kms-key",36 "filters": [37 {38 "type": "key-rotation-status",39 "key": "KeyRotationEnabled",40 "value": True,41 }42 ],43 },44 session_factory=session_factory,45 )46 resources = p.run()47 self.assertEqual(len(resources), 1)48 def test_set_key_rotation(self):49 session_factory = self.replay_flight_data("test_key_rotation_set")50 p = self.load_policy(51 {52 "name": "enable-key-rotation",53 "resource": "kms-key",54 "filters": [55 {"tag:Name": "CMK-Rotation-Test"},56 {57 "type": "key-rotation-status",58 "key": "KeyRotationEnabled",59 "value": False,60 },61 ],62 "actions": [{"type": "set-rotation", "state": True}],63 },64 session_factory=session_factory,65 )66 resources = p.run()67 self.assertEqual(len(resources), 1)68 client = session_factory(region="us-east-1").client("kms")69 key = client.get_key_rotation_status(KeyId=resources[0]["KeyId"])70 self.assertEqual(key["KeyRotationEnabled"], True)71 @functional72 def test_kms_remove_matched(self):73 session_factory = self.replay_flight_data("test_kms_remove_matched")74 sts = session_factory().client("sts")75 current_user_arn = sts.get_caller_identity()["Arn"]76 client = session_factory().client("kms")77 key_id = client.create_key()["KeyMetadata"]["KeyId"]78 self.addCleanup(79 client.schedule_key_deletion, KeyId=key_id, PendingWindowInDays=780 )81 client.put_key_policy(82 KeyId=key_id,83 PolicyName="default",84 Policy=json.dumps(85 {86 "Version": "2012-10-17",87 "Statement": [88 {89 "Sid": "DefaultRoot",90 "Effect": "Allow",91 "Principal": {"AWS": current_user_arn},92 "Action": "kms:*",93 "Resource": "*",94 },95 {96 "Sid": "SpecificAllow",97 "Effect": "Allow",98 "Principal": {"AWS": current_user_arn},99 "Action": "kms:*",100 "Resource": "*",101 },102 {103 "Sid": "Public",104 "Effect": "Allow",105 "Principal": "*",106 "Action": "kms:*",107 "Resource": "*",108 },109 ],110 }111 ),112 )113 self.assertStatementIds(114 client, key_id, "DefaultRoot", "SpecificAllow", "Public"115 )116 p = self.load_policy(117 {118 "name": "kms-rm-matched",119 "resource": "kms-key",120 "filters": [121 {"KeyId": key_id},122 {"type": "cross-account", "whitelist": [self.account_id]},123 ],124 "actions": [{"type": "remove-statements", "statement_ids": "matched"}],125 },126 session_factory=session_factory,127 )128 resources = p.run()129 self.assertEqual([r["KeyId"] for r in resources], [key_id])130 if self.recording:131 time.sleep(60) # takes time before new policy reflected132 self.assertStatementIds(client, key_id, "DefaultRoot", "SpecificAllow")133 def assertStatementIds(self, client, key_id, *expected):134 p = client.get_key_policy(KeyId=key_id, PolicyName="default")["Policy"]135 actual = [s["Sid"] for s in json.loads(p)["Statement"]]136 self.assertEqual(actual, list(expected))137 @functional138 def test_kms_remove_named(self):139 session_factory = self.replay_flight_data("test_kms_remove_named")140 client = session_factory().client("kms")141 key_id = client.create_key()["KeyMetadata"]["KeyId"]142 self.addCleanup(143 client.schedule_key_deletion, KeyId=key_id, PendingWindowInDays=7144 )145 client.put_key_policy(146 KeyId=key_id,147 PolicyName="default",148 Policy=json.dumps(149 {150 "Version": "2008-10-17",151 "Statement": [152 {153 "Sid": "DefaultRoot",154 "Effect": "Allow",155 "Principal": "*",156 "Action": "kms:*",157 "Resource": "*",158 },159 {160 "Sid": "RemoveMe",161 "Effect": "Allow",162 "Principal": "*",163 "Action": "kms:*",164 "Resource": "*",165 },166 ],167 }168 ),169 )170 self.assertStatementIds(client, key_id, "DefaultRoot", "RemoveMe")171 p = self.load_policy(172 {173 "name": "kms-rm-named",174 "resource": "kms-key",175 "filters": [{"KeyId": key_id}],176 "actions": [177 {"type": "remove-statements", "statement_ids": ["RemoveMe"]}178 ],179 },180 session_factory=session_factory,181 )182 resources = p.run()183 self.assertEqual(len(resources), 1)184 if self.recording:185 time.sleep(60) # takes time before new policy reflected186 self.assertStatementIds(client, key_id, "DefaultRoot")187class KMSTagging(BaseTest):188 @functional189 def test_kms_key_tag(self):190 session_factory = self.replay_flight_data("test_kms_key_tag")191 client = session_factory().client("kms")192 key_id = client.create_key()["KeyMetadata"]["KeyId"]193 self.addCleanup(194 client.schedule_key_deletion, KeyId=key_id, PendingWindowInDays=7195 )196 policy = self.load_policy(197 {198 "name": "kms-key-tag",199 "resource": "kms-key",200 "filters": [{"KeyId": key_id}],201 "actions": [202 {"type": "tag", "key": "RequisiteKey", "value": "Required"}203 ],204 },205 session_factory=session_factory,206 )207 resources = policy.run()208 self.assertEqual(len(resources), 1)209 tags = client.list_resource_tags(KeyId=key_id)["Tags"]210 self.assertEqual(tags[0]["TagKey"], "RequisiteKey")211 @functional212 def test_kms_key_remove_tag(self):213 session_factory = self.replay_flight_data("test_kms_key_remove_tag")214 client = session_factory().client("kms")215 key_id = client.create_key(216 Tags=[{"TagKey": "ExpiredTag", "TagValue": "Invalid"}]217 )[218 "KeyMetadata"219 ][220 "KeyId"221 ]222 self.addCleanup(223 client.schedule_key_deletion, KeyId=key_id, PendingWindowInDays=7224 )225 policy = self.load_policy(226 {227 "name": "kms-key-remove-tag",228 "resource": "kms-key",229 "filters": [{"KeyState": "Enabled"}, {"tag:ExpiredTag": "Invalid"}],230 "actions": [{"type": "remove-tag", "tags": ["ExpiredTag"]}],231 },232 session_factory=session_factory,233 )234 resources = policy.run()235 self.assertTrue(len(resources), 1)236 self.assertEqual(resources[0]["KeyId"], key_id)237 tags = client.list_resource_tags(KeyId=key_id)["Tags"]238 self.assertEqual(len(tags), 0)239 def test_kms_key_related(self):240 session_factory = self.replay_flight_data("test_kms_key_related")241 p = self.load_policy(242 {243 "name": "dms-instance-kms-key-related",244 "resource": 'dms-instance',245 "filters": [246 {247 "type": "kms-key",248 "key": "c7n:AliasName",249 "value": "alias/aws/dms",250 "op": "eq"251 }252 ]253 },254 session_factory=session_factory,255 )256 resources = p.run()257 client = session_factory().client("kms")258 self.assertEqual(len(resources), 1)259 resource_kms_key = resources[0]['KmsKeyId']260 aliases = client.list_aliases(KeyId=resource_kms_key)261 target_key_arn = None262 if aliases['Aliases'][0]['AliasName'] == 'alias/aws/dms':263 target_key_id = aliases['Aliases'][0].get('TargetKeyId')264 target_key_arn = client.describe_key(265 KeyId=target_key_id).get('KeyMetadata').get('Arn')...
keys.py
Source:keys.py
...4client = session.client('kms')5def create_key():6 data = client.create_key()7 return data["KeyMetadata"]8def schedule_key_deletion(target_keyid, pending_window=7):9 client.schedule_key_deletion(10 KeyId=target_keyid,11 PendingWindowInDays=pending_window12 )13def create_alias(alias_name, target_keyid):14 client.create_alias(15 AliasName='alias/' + alias_name,16 TargetKeyId=target_keyid17 )18 return None19def delete_alias(alias_name):20 client.delete_alias(21 AliasName='alias/' + alias_name22 )23 return None...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!