How to use put_key_policy method in localstack

Best Python code snippet using localstack_python

configureCMK.py

Source:configureCMK.py Github

copy

Full Screen

...96 "Resource": "*"97 }98 )99 print(f"Updating policy for key {args.cmk_arn}")100 kms_client.put_key_policy(101 KeyId = args.cmk_arn,102 PolicyName = "default",103 Policy = json.dumps(policy)104 )105 print(f"Policy for key {args.cmk_arn} updated.")106def process_stacks(stackname):107 paginator = cloudformation_client.get_paginator('list_stack_resources')108 response_iterator = paginator.paginate(109 StackName=stackname,110 PaginationConfig={111 'MaxItems': 10000#,112 }113 )114 115 for response in response_iterator:116 lambda_resources = filter(lambda x: x["ResourceType"] == "AWS::Lambda::Function",response["StackResourceSummaries"])117 118 for lambda_func in lambda_resources:119 lambda_client.update_function_configuration(FunctionName=lambda_func["PhysicalResourceId"],KMSKeyArn=args.cmk_arn)120 print(f"Updated function {lambda_func['PhysicalResourceId']} in stack {stackname}")121 122 lambda_configuration = lambda_client.get_function_configuration(FunctionName=lambda_func["PhysicalResourceId"])123 role_name = lambda_configuration["Role"].split("/")[-1]124 assign_role(role_name)125 ssm_parameters = filter(lambda x: x["ResourceType"] == "AWS::SSM::Parameter",response["StackResourceSummaries"])126 for parameter in ssm_parameters:127 parameter_name = parameter["PhysicalResourceId"]128 parameter_response = ssm_client.get_parameter(129 Name=parameter_name,130 WithDecryption=True131 )132 parameter_value = parameter_response['Parameter']['Value']133 description = parameter_response['Parameter']["Description"] if "Decription" in parameter_response['Parameter'] else ""134 ssm_client.put_parameter(135 Name=parameter_name,136 Description=description,137 Value=parameter_value,138 Type='SecureString',139 KeyId=args.cmk_arn,140 Overwrite=True,141 )142 s3_buckets = filter(lambda x: x["ResourceType"] == "AWS::S3::Bucket",response["StackResourceSummaries"])143 for bucket in s3_buckets:144 s3_client.put_bucket_encryption(145 Bucket=bucket["PhysicalResourceId"],146 ServerSideEncryptionConfiguration={147 'Rules': [148 {149 'ApplyServerSideEncryptionByDefault': {150 'SSEAlgorithm': 'aws:kms',151 'KMSMasterKeyID': args.cmk_arn152 }153 },154 ]155 }156 )157 print(f"Encryption set for {bucket['PhysicalResourceId']}")158 s3_client.put_bucket_logging(159 Bucket=bucket["PhysicalResourceId"],160 BucketLoggingStatus={161 'LoggingEnabled': {162 'TargetBucket': args.target_s3_bucket,163 'TargetPrefix': bucket["PhysicalResourceId"] + '/'164 }165 }166 )167 print(f"Access Logs set for {bucket['PhysicalResourceId']}")168 ddb_tables = filter(lambda x: x["ResourceType"] == "AWS::DynamoDB::Table",response["StackResourceSummaries"])169 for table in ddb_tables:170 table_description = ddb_client.describe_table(TableName = table["PhysicalResourceId"])171 if('SSEDescription' not in table_description["Table"] or 'KMSMasterKeyArn' not in table_description["Table"]['SSEDescription'] or table_description["Table"]['SSEDescription']['KMSMasterKeyArn']!= args.cmk_arn ):172 ddb_client.update_table(173 TableName = table["PhysicalResourceId"],174 SSESpecification ={175 'Enabled': True,176 'SSEType': 'KMS',177 'KMSMasterKeyId': args.cmk_arn178 }179 )180 181 kinesis_streams = filter(lambda x: x["ResourceType"] == "AWS::KinesisFirehose::DeliveryStream",response["StackResourceSummaries"])182 for stream in kinesis_streams:183 stream_response = kinesis_client.describe_delivery_stream(184 DeliveryStreamName=stream["PhysicalResourceId"])185 if('KeyType' not in stream_response['DeliveryStreamDescription']['DeliveryStreamEncryptionConfiguration'] 186 or ( stream_response['DeliveryStreamDescription']['DeliveryStreamEncryptionConfiguration']['KeyType'] != "CUSTOMER_MANAGED_CMK" 187 and stream_response['DeliveryStreamDescription']['DeliveryStreamEncryptionConfiguration']['KeyARN'] != args.cmk_arn)):188 kinesis_client.start_delivery_stream_encryption(189 DeliveryStreamName=stream["PhysicalResourceId"],190 DeliveryStreamEncryptionConfigurationInput={191 'KeyARN': args.cmk_arn,192 'KeyType': 'CUSTOMER_MANAGED_CMK'})193 role_resources = filter(lambda x: 'LambdaRole' in x["LogicalResourceId"] or x["LogicalResourceId"] in cmk_roles_logical_ids , response["StackResourceSummaries"])194 for role_resource in role_resources:195 print(f"role_resource: {role_resource['PhysicalResourceId']}")196 cmk_roles_physical_ids.append(role_resource["PhysicalResourceId"])197 assign_role(role_resource["PhysicalResourceId"])198process_stacks(args.stack_arn)199paginator = cloudformation_client.get_paginator('list_stack_resources')200response_iterator = paginator.paginate(201 StackName=args.stack_arn,202 PaginationConfig={203 'MaxItems': 10000,204 }205)206for response in response_iterator:207 stacks = filter(lambda x: x["ResourceType"] == "AWS::CloudFormation::Stack",response["StackResourceSummaries"])208 for stack in stacks:209 print(f"Processing stack {stack['PhysicalResourceId']}")210 process_stacks(stack["PhysicalResourceId"])...

Full Screen

Full Screen

util_kms.py

Source:util_kms.py Github

copy

Full Screen

...74 )75 except botocore.exceptions.ClientError as e:76 erm = _fail(e, 'create_alias', aliasName)77 raise Exception(erm)78def put_key_policy(ctx, cmkArn, policyJson):79 try:80 client = ctx.client('kms')81 client.put_key_policy(82 KeyId=cmkArn,83 PolicyName='default',84 Policy=policyJson85 )86 except botocore.exceptions.ClientError as e:87 erm = _fail(e, 'put_key_policy', cmkArn)88 raise Exception(erm)89def get_key_rotation_status(ctx, cmkArn):90 try:91 client = ctx.client('kms')92 response = client.get_key_rotation_status(93 KeyId=cmkArn94 )95 return response['KeyRotationEnabled']96 except botocore.exceptions.ClientError as e:97 erm = _fail(e, 'get_key_rotation_status', cmkArn)98 raise Exception(erm)99def get_key_policy(ctx, cmkArn):100 try:101 client = ctx.client('kms')102 response = client.get_key_policy(103 KeyId=cmkArn,104 PolicyName='default'105 )106 return response['Policy']107 except botocore.exceptions.ClientError as e:108 erm = _fail(e, 'get_key_policy', cmkArn)109 raise Exception(erm)110def update_key_description(ctx, cmkArn, description):111 try:112 client = ctx.client('kms')113 client.update_key_description(114 KeyId=cmkArn,115 Description=description116 )117 except botocore.exceptions.ClientError as e:118 erm = _fail(e, 'update_key_description', cmkArn)119 raise Exception(erm)120def enable_key_rotation(ctx, cmkArn):121 try:122 client = ctx.client('kms')123 client.enable_key_rotation(124 KeyId=cmkArn125 )126 except botocore.exceptions.ClientError as e:127 erm = _fail(e, 'enable_key_rotation', cmkArn)128 raise Exception(erm)129def schedule_key_deletion(ctx, cmkArn, pendingWindowInDays):130 try:131 client = ctx.client('kms')132 client.schedule_key_deletion(133 KeyId=cmkArn,134 PendingWindowInDays=pendingWindowInDays135 )136 except botocore.exceptions.ClientError as e:137 if _is_resource_not_found(e): return None138 erm = _fail(e, 'schedule_key_deletion', cmkArn)139 raise Exception(erm)140def delete_alias(ctx, canonAlias):141 try:142 client = ctx.client('kms')143 client.delete_alias(144 AliasName=canonAlias145 )146 except botocore.exceptions.ClientError as e:147 if _is_resource_not_found(e): return None148 erm = _fail(e, 'delete_alias', canonAlias)149 raise Exception(erm)150def getCMKMeta(ctx, keyId):151 try:152 client = ctx.client('kms')153 response = client.describe_key(154 KeyId=keyId155 )156 return response['KeyMetadata']157 except botocore.exceptions.ClientError as e:158 if _is_resource_not_found(e): return None159 erm = _fail(e, 'describe_key', keyId)160 raise Exception(erm)161def declareCMK(ctx, description, alias, policyStatements):162 statements = [policy_statement_default(ctx)]163 statements.extend(policyStatements)164 policyMap = policy_map(statements)165 reqdPolicyJson = json.dumps(policyMap)166 canonAlias = canon_alias(alias)167 exMeta = getCMKMeta(ctx, canonAlias)168 createReqd = False169 if exMeta:170 keyState = exMeta['KeyState']171 if keyState == 'PendingDeletion':172 createReqd = True173 elif keyState == 'Enabled':174 createReqd = False175 else:176 erm = 'KMS CMK {} in unexpected state {}'.format(alias, keyState)177 raise Exception(erm)178 else:179 createReqd = True180 if createReqd:181 newArn = create_key_arn(ctx, description, reqdPolicyJson)182 create_alias(ctx, canonAlias, newArn)183 enable_key_rotation(ctx, newArn)184 return newArn185 exArn = exMeta['Arn']186 exDescription = exMeta['Description']187 exPolicyJson = get_key_policy(ctx, exArn)188 exPolicyJsonCanon = json.dumps(json.loads(exPolicyJson))189 if exPolicyJsonCanon != reqdPolicyJson:190 put_key_policy(ctx, exArn, reqdPolicyJson)191 if exDescription != description:192 update_key_description(ctx, exArn, description)193 isRotationEnabled = get_key_rotation_status(ctx, exArn)194 if not isRotationEnabled:195 enable_key_rotation(ctx, exArn)196 return exArn197def deleteCMK(ctx, alias, pendingWindowInDays=7):198 canonAlias = canon_alias(alias)199 exMeta = getCMKMeta(ctx, canonAlias)200 if exMeta:201 exArn = exMeta['Arn']202 delete_alias(ctx, canonAlias)...

Full Screen

Full Screen

kms.py

Source:kms.py Github

copy

Full Screen

...73 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,74 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}75 self.logger.exception(message)76 raise77 def put_key_policy(self, key_id, policy):78 try:79 response = kms_client.put_key_policy(80 KeyId=key_id,81 Policy=policy,82 PolicyName = 'default', # Per API docs, the only valid value is default.83 BypassPolicyLockoutSafetyCheck=True84 )85 return response86 except Exception as e:87 message = {'FILE': __file__.split('/')[-1], 'CLASS': self.__class__.__name__,88 'METHOD': inspect.stack()[0][3], 'EXCEPTION': str(e)}89 self.logger.exception(message)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful