How to use await_stack_completion method in localstack

Best Python code snippet using localstack_python

test_cloudformation.py

Source:test_cloudformation.py Github

copy

Full Screen

...440 template = template_deployer.template_to_json(load_file(TEST_TEMPLATE_1))441 # deploy template442 stack_name = 'stack-%s' % short_uid()443 cloudformation.create_stack(StackName=stack_name, TemplateBody=template)444 await_stack_completion(stack_name)445 # assert that resources have been created446 self.assertTrue(bucket_exists('cf-test-bucket-1'))447 queue_url = queue_exists('cf-test-queue-1')448 self.assertTrue(queue_url)449 topic_arn = topic_exists('%s-test-topic-1-1' % stack_name)450 self.assertTrue(topic_arn)451 self.assertTrue(stream_exists('cf-test-stream-1'))452 resource = describe_stack_resource(stack_name, 'SQSQueueNoNameProperty')453 self.assertTrue(queue_exists(resource['PhysicalResourceId']))454 self.assertTrue(ssm_param_exists('cf-test-param-1'))455 # assert that tags have been created456 tags = s3.get_bucket_tagging(Bucket='cf-test-bucket-1')['TagSet']457 self.assertEqual(tags, [{'Key': 'foobar', 'Value': aws_stack.get_sqs_queue_url('cf-test-queue-1')}])458 tags = sns.list_tags_for_resource(ResourceArn=topic_arn)['Tags']459 self.assertEqual(tags, [460 {'Key': 'foo', 'Value': 'cf-test-bucket-1'},461 {'Key': 'bar', 'Value': aws_stack.s3_bucket_arn('cf-test-bucket-1')}462 ])463 queue_tags = sqs.list_queue_tags(QueueUrl=queue_url)464 self.assertIn('Tags', queue_tags)465 self.assertEqual(queue_tags['Tags'], {'key1': 'value1', 'key2': 'value2'})466 # assert that bucket notifications have been created467 notifications = s3.get_bucket_notification_configuration(Bucket='cf-test-bucket-1')468 self.assertIn('QueueConfigurations', notifications)469 self.assertIn('LambdaFunctionConfigurations', notifications)470 self.assertEqual(notifications['QueueConfigurations'][0]['QueueArn'], 'aws:arn:sqs:test:testqueue')471 self.assertEqual(notifications['QueueConfigurations'][0]['Events'], ['s3:ObjectDeleted:*'])472 self.assertEqual(473 notifications['LambdaFunctionConfigurations'][0]['LambdaFunctionArn'],474 'aws:arn:lambda:test:testfunc'475 )476 self.assertEqual(notifications['LambdaFunctionConfigurations'][0]['Events'], ['s3:ObjectCreated:*'])477 # assert that subscriptions have been created478 subs = sns.list_subscriptions()['Subscriptions']479 subs = [s for s in subs if (':%s:cf-test-queue-1' % TEST_AWS_ACCOUNT_ID) in s['Endpoint']]480 self.assertEqual(len(subs), 1)481 self.assertIn(':%s:%s-test-topic-1-1' % (TEST_AWS_ACCOUNT_ID, stack_name), subs[0]['TopicArn'])482 # assert that subscription attributes are added properly483 attrs = sns.get_subscription_attributes(SubscriptionArn=subs[0]['SubscriptionArn'])['Attributes']484 self.assertEqual(attrs, {'Endpoint': subs[0]['Endpoint'], 'Protocol': 'sqs',485 'SubscriptionArn': subs[0]['SubscriptionArn'], 'TopicArn': subs[0]['TopicArn'],486 'FilterPolicy': json.dumps({'eventType': ['created']})})487 # assert that Gateway responses have been created488 test_api_name = 'test-api'489 api = [a for a in apigateway.get_rest_apis()['items'] if a['name'] == test_api_name][0]490 responses = apigateway.get_gateway_responses(restApiId=api['id'])['items']491 self.assertEqual(len(responses), 2)492 types = [r['responseType'] for r in responses]493 self.assertEqual(set(types), set(['UNAUTHORIZED', 'DEFAULT_5XX']))494 # delete the stack495 cf_client.delete_stack(StackName=stack_name)496 # assert that resources have been deleted497 self.assertFalse(bucket_exists('cf-test-bucket-1'))498 self.assertFalse(queue_exists('cf-test-queue-1'))499 self.assertFalse(topic_exists('%s-test-topic-1-1' % stack_name))500 retry(lambda: self.assertFalse(stream_exists('cf-test-stream-1')))501 def test_list_stack_events(self):502 cloudformation = aws_stack.connect_to_service('cloudformation')503 response = cloudformation.describe_stack_events()504 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)505 def test_validate_template(self):506 cloudformation = aws_stack.connect_to_service('cloudformation')507 template = template_deployer.template_to_json(load_file(TEST_VALID_TEMPLATE))508 resp = cloudformation.validate_template(TemplateBody=template)509 self.assertEqual(resp['ResponseMetadata']['HTTPStatusCode'], 200)510 self.assertIn('Parameters', resp)511 self.assertEqual(len(resp['Parameters']), 1)512 self.assertEqual(resp['Parameters'][0]['ParameterKey'], 'KeyExample')513 self.assertEqual(resp['Parameters'][0]['Description'], 'The EC2 Key Pair to allow SSH access to the instance')514 def test_validate_invalid_json_template_should_fail(self):515 cloudformation = aws_stack.connect_to_service('cloudformation')516 invalid_json = '{"this is invalid JSON"="bobbins"}'517 try:518 cloudformation.validate_template(TemplateBody=invalid_json)519 self.fail('Should raise ValidationError')520 except (ClientError, ResponseParserError) as err:521 if isinstance(err, ClientError):522 self.assertEqual(err.response['ResponseMetadata']['HTTPStatusCode'], 400)523 self.assertEqual(err.response['Error']['Message'], 'Template Validation Error')524 def test_list_stack_resources_returns_queue_urls(self):525 cloudformation = aws_stack.connect_to_resource('cloudformation')526 template = template_deployer.template_to_json(load_file(TEST_TEMPLATE_2))527 stack_name = 'stack-%s' % short_uid()528 cloudformation.create_stack(StackName=stack_name, TemplateBody=template)529 details = await_stack_completion(stack_name)530 stack_summaries = list_stack_resources(stack_name)531 queue_urls = get_queue_urls()532 topic_arns = get_topic_arns()533 stack_queues = [r for r in stack_summaries if r['ResourceType'] == 'AWS::SQS::Queue']534 for resource in stack_queues:535 self.assertIn(resource['PhysicalResourceId'], queue_urls)536 stack_topics = [r for r in stack_summaries if r['ResourceType'] == 'AWS::SNS::Topic']537 for resource in stack_topics:538 self.assertIn(resource['PhysicalResourceId'], topic_arns)539 # assert that stack outputs are returned properly540 outputs = details.get('Outputs', [])541 self.assertEqual(len(outputs), 1)542 self.assertEqual(outputs[0]['ExportName'], 'SQSQueue1-URL')543 def test_create_change_set(self):544 cloudformation = aws_stack.connect_to_service('cloudformation')545 # deploy template546 stack_name = 'stack-%s' % short_uid()547 cloudformation.create_stack(StackName=stack_name, TemplateBody=TEST_TEMPLATE_3)548 # create change set with the same template (no changes)549 response = cloudformation.create_change_set(550 StackName=stack_name,551 TemplateBody=TEST_TEMPLATE_3,552 ChangeSetName='nochanges'553 )554 self.assertIn(':%s:changeSet/nochanges/' % TEST_AWS_ACCOUNT_ID, response['Id'])555 self.assertIn(':%s:stack/' % TEST_AWS_ACCOUNT_ID, response['StackId'])556 def test_sam_template(self):557 cloudformation = aws_stack.connect_to_service('cloudformation')558 awslambda = aws_stack.connect_to_service('lambda')559 # deploy template560 stack_name = 'stack-%s' % short_uid()561 func_name = 'test-%s' % short_uid()562 template = load_file(TEST_TEMPLATE_4) % func_name563 cloudformation.create_stack(StackName=stack_name, TemplateBody=template)564 # run Lambda test invocation565 result = awslambda.invoke(FunctionName=func_name)566 result = json.loads(to_str(result['Payload'].read()))567 self.assertEqual(result, {'hello': 'world'})568 # delete lambda function569 awslambda.delete_function(FunctionName=func_name)570 def test_nested_stack(self):571 s3 = aws_stack.connect_to_service('s3')572 cloudformation = aws_stack.connect_to_service('cloudformation')573 # upload template to S3574 s3.create_bucket(Bucket=TEST_ARTIFACTS_BUCKET, ACL='public-read')575 s3.put_object(Bucket=TEST_ARTIFACTS_BUCKET, Key=TEST_ARTIFACTS_PATH, Body=load_file(TEST_TEMPLATE_5))576 # deploy template577 buckets_before = len(s3.list_buckets()['Buckets'])578 stack_name = 'stack-%s' % short_uid()579 param_value = short_uid()580 cloudformation.create_stack(581 StackName=stack_name,582 TemplateBody=load_file(TEST_TEMPLATE_6) % (TEST_ARTIFACTS_BUCKET, TEST_ARTIFACTS_PATH),583 Parameters=[{'ParameterKey': 'GlobalParam', 'ParameterValue': param_value}]584 )585 # assert that nested resources have been created586 buckets_after = s3.list_buckets()['Buckets']587 num_buckets_after = len(buckets_after)588 self.assertEqual(num_buckets_after, buckets_before + 1)589 bucket_names = [b['Name'] for b in buckets_after]590 self.assertIn('test-%s' % param_value, bucket_names)591 # delete the stack592 cloudformation.delete_stack(StackName=stack_name)593 def test_create_cfn_lambda_without_function_name(self):594 lambda_client = aws_stack.connect_to_service('lambda')595 cloudformation = aws_stack.connect_to_service('cloudformation')596 rs = lambda_client.list_functions()597 # Number of lambdas before of stack creation598 lambdas_before = len(rs['Functions'])599 stack_name = 'stack-%s' % short_uid()600 lambda_role_name = 'lambda-role-%s' % short_uid()601 template = json.loads(load_file(TEST_TEMPLATE_7))602 template['Resources']['LambdaExecutionRole']['Properties']['RoleName'] = lambda_role_name603 rs = cloudformation.create_stack(StackName=stack_name, TemplateBody=json.dumps(template))604 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)605 self.assertIn('StackId', rs)606 self.assertIn(stack_name, rs['StackId'])607 await_stack_completion(stack_name)608 rs = lambda_client.list_functions()609 # There is 1 new lambda function610 self.assertEqual(lambdas_before + 1, len(rs['Functions']))611 # delete the stack612 cloudformation.delete_stack(StackName=stack_name)613 rs = lambda_client.list_functions()614 # Back to what we had before615 self.assertEqual(lambdas_before, len(rs['Functions']))616 def test_deploy_stack_change_set(self):617 cloudformation = aws_stack.connect_to_service('cloudformation')618 stack_name = 'stack-%s' % short_uid()619 change_set_name = 'change-set-%s' % short_uid()620 bucket_name = 'bucket-%s' % short_uid()621 with self.assertRaises(ClientError) as ctx:622 cloudformation.describe_stacks(StackName=stack_name)623 self.assertEqual(ctx.exception.response['Error']['Code'], 'ValidationError')624 rs = cloudformation.create_change_set(625 StackName=stack_name,626 ChangeSetName=change_set_name,627 TemplateBody=TEST_CHANGE_SET_BODY % bucket_name,628 Parameters=[{629 'ParameterKey': 'EnvironmentType',630 'ParameterValue': 'stage'631 }],632 Capabilities=['CAPABILITY_IAM'],633 )634 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)635 change_set_id = rs['Id']636 rs = cloudformation.describe_change_set(637 StackName=stack_name,638 ChangeSetName=change_set_id639 )640 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)641 self.assertEqual(rs['ChangeSetName'], change_set_name)642 self.assertEqual(rs['ChangeSetId'], change_set_id)643 self.assertEqual(rs['Status'], 'CREATE_COMPLETE')644 rs = cloudformation.execute_change_set(645 StackName=stack_name,646 ChangeSetName=change_set_name647 )648 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)649 await_stack_completion(stack_name)650 rs = cloudformation.describe_stacks(StackName=stack_name)651 stack = rs['Stacks'][0]652 parameters = stack['Parameters']653 self.assertEqual(stack['StackName'], stack_name)654 self.assertEqual(parameters[0]['ParameterKey'], 'EnvironmentType')655 self.assertEqual(parameters[0]['ParameterValue'], 'stage')656 self.assertTrue(bucket_exists(bucket_name))657 # clean up658 cloudformation.delete_change_set(659 StackName=stack_name,660 ChangeSetName=change_set_name661 )662 cloudformation.delete_stack(663 StackName=stack_name664 )665 def test_deploy_stack_with_iam_role(self):666 stack_name = 'stack-%s' % short_uid()667 change_set_name = 'change-set-%s' % short_uid()668 role_name = 'role-%s' % short_uid()669 cloudformation = aws_stack.connect_to_service('cloudformation')670 iam_client = aws_stack.connect_to_service('iam')671 roles_before = iam_client.list_roles()['Roles']672 with self.assertRaises(ClientError) as ctx:673 cloudformation.describe_stacks(StackName=stack_name)674 self.assertEqual(ctx.exception.response['Error']['Code'], 'ValidationError')675 rs = cloudformation.create_change_set(676 StackName=stack_name,677 ChangeSetName=change_set_name,678 TemplateBody=load_file(TEST_DEPLOY_BODY_1) % role_name679 )680 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)681 change_set_id = rs['Id']682 rs = cloudformation.describe_change_set(683 StackName=stack_name,684 ChangeSetName=change_set_id685 )686 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)687 self.assertEqual(rs['ChangeSetName'], change_set_name)688 self.assertEqual(rs['ChangeSetId'], change_set_id)689 self.assertEqual(rs['Status'], 'CREATE_COMPLETE')690 rs = cloudformation.execute_change_set(691 StackName=stack_name,692 ChangeSetName=change_set_name693 )694 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)695 await_stack_completion(stack_name)696 rs = cloudformation.describe_stacks(StackName=stack_name)697 stack = rs['Stacks'][0]698 self.assertEqual(stack['StackName'], stack_name)699 rs = iam_client.list_roles()700 self.assertEqual(len(rs['Roles']), len(roles_before) + 1)701 self.assertEqual(rs['Roles'][-1]['RoleName'], role_name)702 rs = iam_client.list_role_policies(703 RoleName=role_name704 )705 iam_client.delete_role_policy(706 RoleName=role_name,707 PolicyName=rs['PolicyNames'][0]708 )709 # clean up710 cloudformation.delete_change_set(711 StackName=stack_name,712 ChangeSetName=change_set_name713 )714 cloudformation.delete_stack(715 StackName=stack_name716 )717 rs = iam_client.list_roles(718 PathPrefix=role_name719 )720 self.assertEqual(len(rs['Roles']), 0)721 def test_deploy_stack_with_sns_topic(self):722 stack_name = 'stack-%s' % short_uid()723 change_set_name = 'change-set-%s' % short_uid()724 cloudformation = aws_stack.connect_to_service('cloudformation')725 rs = cloudformation.create_change_set(726 StackName=stack_name,727 ChangeSetName=change_set_name,728 TemplateBody=load_file(TEST_DEPLOY_BODY_2),729 Parameters=[730 {731 'ParameterKey': 'CompanyName',732 'ParameterValue': 'MyCompany'733 },734 {735 'ParameterKey': 'MyEmail1',736 'ParameterValue': 'my@email.com'737 }738 ]739 )740 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)741 rs = cloudformation.execute_change_set(742 StackName=stack_name,743 ChangeSetName=change_set_name744 )745 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)746 await_stack_completion(stack_name)747 rs = cloudformation.describe_stacks(StackName=stack_name)748 stack = rs['Stacks'][0]749 self.assertEqual(stack['StackName'], stack_name)750 outputs = stack['Outputs']751 self.assertEqual(len(outputs), 3)752 topic_arn = None753 for op in outputs:754 if op['OutputKey'] == 'MyTopic':755 topic_arn = op['OutputValue']756 sns_client = aws_stack.connect_to_service('sns')757 rs = sns_client.list_topics()758 # Topic resource created759 topics = [tp for tp in rs['Topics'] if tp['TopicArn'] == topic_arn]760 self.assertEqual(len(topics), 1)761 # clean up762 cloudformation.delete_change_set(763 StackName=stack_name,764 ChangeSetName=change_set_name765 )766 cloudformation.delete_stack(767 StackName=stack_name768 )769 # Topic resource removed770 rs = sns_client.list_topics()771 topics = [tp for tp in rs['Topics'] if tp['TopicArn'] == topic_arn]772 self.assertEqual(len(topics), 0)773 def test_deploy_stack_with_dynamodb_table(self):774 stack_name = 'stack-%s' % short_uid()775 change_set_name = 'change-set-%s' % short_uid()776 env = 'Staging'777 ddb_table_name_prefix = 'ddb-table-%s' % short_uid()778 ddb_table_name = '{}-{}'.format(ddb_table_name_prefix, env)779 cloudformation = aws_stack.connect_to_service('cloudformation')780 rs = cloudformation.create_change_set(781 StackName=stack_name,782 ChangeSetName=change_set_name,783 TemplateBody=load_file(TEST_DEPLOY_BODY_3),784 Parameters=[785 {786 'ParameterKey': 'tableName',787 'ParameterValue': ddb_table_name_prefix788 },789 {790 'ParameterKey': 'env',791 'ParameterValue': env792 }793 ]794 )795 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)796 change_set_id = rs['Id']797 rs = cloudformation.execute_change_set(798 StackName=stack_name,799 ChangeSetName=change_set_name800 )801 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)802 await_stack_completion(stack_name)803 rs = cloudformation.describe_stacks(StackName=stack_name)804 stacks = [stack for stack in rs['Stacks'] if stack['StackName'] == stack_name]805 self.assertEqual(len(stacks), 1)806 self.assertEqual(stacks[0]['ChangeSetId'], change_set_id)807 outputs = {808 output['OutputKey']: output['OutputValue']809 for output in stacks[0]['Outputs']810 }811 self.assertIn('Arn', outputs)812 self.assertEqual(outputs['Arn'], 'arn:aws:dynamodb:{}:{}:table/{}'.format(813 aws_stack.get_region(), TEST_AWS_ACCOUNT_ID, ddb_table_name))814 self.assertIn('Name', outputs)815 self.assertEqual(outputs['Name'], ddb_table_name)816 ddb_client = aws_stack.connect_to_service('dynamodb')817 rs = ddb_client.list_tables()818 self.assertIn(ddb_table_name, rs['TableNames'])819 # clean up820 cloudformation.delete_change_set(821 StackName=stack_name,822 ChangeSetName=change_set_name823 )824 cloudformation.delete_stack(825 StackName=stack_name826 )827 rs = ddb_client.list_tables()828 self.assertNotIn(ddb_table_name, rs['TableNames'])829 def test_deploy_stack_with_iam_nested_policy(self):830 stack_name = 'stack-%s' % short_uid()831 change_set_name = 'change-set-%s' % short_uid()832 cloudformation = aws_stack.connect_to_service('cloudformation')833 rs = cloudformation.create_change_set(834 StackName=stack_name,835 ChangeSetName=change_set_name,836 TemplateBody=load_file(TEST_DEPLOY_BODY_4)837 )838 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)839 change_set_id = rs['Id']840 rs = cloudformation.describe_change_set(841 StackName=stack_name,842 ChangeSetName=change_set_id843 )844 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)845 self.assertEqual(rs['ChangeSetId'], change_set_id)846 self.assertEqual(rs['Status'], 'CREATE_COMPLETE')847 iam_client = aws_stack.connect_to_service('iam')848 rs = iam_client.list_roles()849 number_of_roles = len(rs['Roles'])850 rs = cloudformation.execute_change_set(851 StackName=stack_name,852 ChangeSetName=change_set_name853 )854 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)855 await_stack_completion(stack_name)856 rs = iam_client.list_roles()857 # 1 role was created858 self.assertEqual(number_of_roles + 1, len(rs['Roles']))859 # clean up860 cloudformation.delete_change_set(861 StackName=stack_name,862 ChangeSetName=change_set_name863 )864 cloudformation.delete_stack(865 StackName=stack_name866 )867 rs = iam_client.list_roles()868 # role was removed869 self.assertEqual(number_of_roles, len(rs['Roles']))870 def test_cfn_handle_s3_bucket_resources(self):871 stack_name = 'stack-%s' % short_uid()872 bucket_name = 's3-bucket-%s' % short_uid()873 TEST_TEMPLATE_8['Resources']['S3Bucket']['Properties']['BucketName'] = bucket_name874 self.assertFalse(bucket_exists(bucket_name))875 s3 = aws_stack.connect_to_service('s3')876 cfn = aws_stack.connect_to_service('cloudformation')877 deploy_cf_stack(stack_name=stack_name, template_body=json.dumps(TEST_TEMPLATE_8))878 self.assertTrue(bucket_exists(bucket_name))879 rs = s3.get_bucket_policy(880 Bucket=bucket_name881 )882 self.assertIn('Policy', rs)883 self.assertEqual(json.loads(rs['Policy']),884 TEST_TEMPLATE_8['Resources']['S3BucketPolicy']['Properties']['PolicyDocument'])885 cfn.delete_stack(StackName=stack_name)886 self.assertFalse(bucket_exists(bucket_name))887 with self.assertRaises(ClientError) as ctx:888 s3.get_bucket_policy(Bucket=bucket_name)889 self.assertEqual(ctx.exception.response['Error']['Code'], 'NoSuchBucket')890 rs = cfn.create_stack(StackName=stack_name, TemplateBody=json.dumps(TEST_TEMPLATE_8))891 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)892 # clean up893 cfn.delete_stack(StackName=stack_name)894 def test_cfn_handle_log_group_resource(self):895 stack_name = 'stack-%s' % short_uid()896 log_group_prefix = '/aws/lambda/AWS_DUB_LAM_10000000'897 cfn = aws_stack.connect_to_service('cloudformation')898 logs_client = aws_stack.connect_to_service('logs')899 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_9)900 rs = logs_client.describe_log_groups(901 logGroupNamePrefix=log_group_prefix902 )903 self.assertEqual(len(rs['logGroups']), 1)904 self.assertEqual(rs['logGroups'][0]['logGroupName'],905 '/aws/lambda/AWS_DUB_LAM_10000000_dev_MessageFooHandler_dev')906 cfn.delete_stack(StackName=stack_name)907 rs = logs_client.describe_log_groups(908 logGroupNamePrefix=log_group_prefix909 )910 self.assertEqual(len(rs['logGroups']), 0)911 def test_cfn_handle_elasticsearch_domain(self):912 stack_name = 'stack-%s' % short_uid()913 domain_name = 'es-%s' % short_uid()914 cloudformation = aws_stack.connect_to_service('cloudformation')915 es_client = aws_stack.connect_to_service('es')916 cloudformation.create_stack(917 StackName=stack_name,918 TemplateBody=TEST_TEMPLATE_10,919 Parameters=[{'ParameterKey': 'DomainName', 'ParameterValue': domain_name}]920 )921 details = await_stack_completion(stack_name)922 outputs = details.get('Outputs', [])923 self.assertEqual(len(outputs), 3)924 rs = es_client.describe_elasticsearch_domain(925 DomainName=domain_name926 )927 status = rs['DomainStatus']928 self.assertEqual(domain_name, status['DomainName'])929 tags = es_client.list_tags(ARN=status['ARN'])['TagList']930 self.assertEqual([{'Key': 'k1', 'Value': 'v1'}, {'Key': 'k2', 'Value': 'v2'}], tags)931 for o in outputs:932 if o['OutputKey'] in ['MyElasticsearchArn', 'MyElasticsearchDomainArn']:933 self.assertEqual(o['OutputValue'], status['ARN'])934 elif o['OutputKey'] == 'MyElasticsearchDomainEndpoint':935 self.assertEqual(o['OutputValue'], status['Endpoint'])936 else:937 self.fail('Unexpected output: %s' % o)938 cloudformation.delete_stack(StackName=stack_name)939 def test_cfn_handle_secretsmanager_secret(self):940 stack_name = 'stack-%s' % short_uid()941 secret_name = 'secret-%s' % short_uid()942 cloudformation = aws_stack.connect_to_service('cloudformation')943 params = [{'ParameterKey': 'SecretName', 'ParameterValue': secret_name}]944 cloudformation.create_stack(StackName=stack_name, TemplateBody=TEST_TEMPLATE_11, Parameters=params)945 await_stack_completion(stack_name)946 secretsmanager_client = aws_stack.connect_to_service('secretsmanager')947 rs = secretsmanager_client.describe_secret(948 SecretId=secret_name949 )950 self.assertEqual(secret_name, rs['Name'])951 self.assertNotIn('DeletedDate', rs)952 cloudformation.delete_stack(StackName=stack_name)953 rs = secretsmanager_client.describe_secret(954 SecretId=secret_name955 )956 self.assertIn('DeletedDate', rs)957 def test_cfn_handle_kinesis_firehose_resources(self):958 stack_name = 'stack-%s' % short_uid()959 kinesis_stream_name = 'kinesis-stream-%s' % short_uid()960 firehose_role_name = 'firehose-role-%s' % short_uid()961 firehose_stream_name = 'firehose-stream-%s' % short_uid()962 cloudformation = aws_stack.connect_to_service('cloudformation')963 cloudformation.create_stack(964 StackName=stack_name,965 TemplateBody=TEST_TEMPLATE_12 % firehose_role_name,966 Parameters=[967 {'ParameterKey': 'KinesisStreamName', 'ParameterValue': kinesis_stream_name},968 {'ParameterKey': 'DeliveryStreamName', 'ParameterValue': firehose_stream_name}969 ]970 )971 details = await_stack_completion(stack_name)972 outputs = details.get('Outputs', [])973 self.assertEqual(len(outputs), 1)974 kinesis_client = aws_stack.connect_to_service('kinesis')975 firehose_client = aws_stack.connect_to_service('firehose')976 rs = firehose_client.describe_delivery_stream(977 DeliveryStreamName=firehose_stream_name978 )979 self.assertEqual(outputs[0]['OutputValue'], rs['DeliveryStreamDescription']['DeliveryStreamARN'])980 self.assertEqual(firehose_stream_name, rs['DeliveryStreamDescription']['DeliveryStreamName'])981 rs = kinesis_client.describe_stream(982 StreamName=kinesis_stream_name983 )984 self.assertEqual(rs['StreamDescription']['StreamName'], kinesis_stream_name)985 cloudformation.delete_stack(StackName=stack_name)986 time.sleep(2)987 rs = kinesis_client.list_streams()988 self.assertNotIn(kinesis_stream_name, rs['StreamNames'])989 rs = firehose_client.list_delivery_streams()990 self.assertNotIn(firehose_stream_name, rs['DeliveryStreamNames'])991 def test_cfn_handle_iam_role_resource(self):992 stack_name = 'stack-%s' % short_uid()993 role_name = 'role-%s' % short_uid()994 role_path_prefix = '/role-prefix-%s/' % short_uid()995 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_13 % (role_name, role_path_prefix))996 cfn = aws_stack.connect_to_service('cloudformation')997 iam = aws_stack.connect_to_service('iam')998 rs = iam.list_roles(999 PathPrefix=role_path_prefix1000 )1001 self.assertEqual(len(rs['Roles']), 1)1002 role = rs['Roles'][0]1003 self.assertEqual(role['RoleName'], role_name)1004 cfn.delete_stack(StackName=stack_name)1005 rs = iam.list_roles(1006 PathPrefix=role_path_prefix1007 )1008 self.assertEqual(len(rs['Roles']), 0)1009 def test_cfn_handle_iam_role_resource_no_role_name(self):1010 cfn = aws_stack.connect_to_service('cloudformation')1011 iam = aws_stack.connect_to_service('iam')1012 stack_name = 'stack-%s' % short_uid()1013 role_path_prefix = '/role-prefix-%s/' % short_uid()1014 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_14 % role_path_prefix)1015 rs = iam.list_roles(PathPrefix=role_path_prefix)1016 self.assertEqual(len(rs['Roles']), 1)1017 cfn.delete_stack(StackName=stack_name)1018 rs = iam.list_roles(PathPrefix=role_path_prefix)1019 self.assertEqual(len(rs['Roles']), 0)1020 def test_cfn_conditional_deployment(self):1021 s3 = aws_stack.connect_to_service('s3')1022 bucket_id = short_uid()1023 template = TEST_TEMPLATE_19.format(id=bucket_id)1024 stack_name = 'stack-%s' % short_uid()1025 deploy_cf_stack(stack_name=stack_name, template_body=template)1026 buckets = s3.list_buckets()['Buckets']1027 dev_bucket = 'cf-dev-%s' % bucket_id1028 prd_bucket = 'cf-prd-%s' % bucket_id1029 dev_bucket = [b for b in buckets if b['Name'] == dev_bucket]1030 prd_bucket = [b for b in buckets if b['Name'] == prd_bucket]1031 self.assertFalse(prd_bucket)1032 self.assertTrue(dev_bucket)1033 def test_cfn_handle_sqs_resource(self):1034 stack_name = 'stack-%s' % short_uid()1035 fifo_queue = 'queue-%s.fifo' % short_uid()1036 cfn = aws_stack.connect_to_service('cloudformation')1037 sqs = aws_stack.connect_to_service('sqs')1038 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_15 % fifo_queue)1039 rs = sqs.get_queue_url(QueueName=fifo_queue)1040 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)1041 queue_url = rs['QueueUrl']1042 rs = sqs.get_queue_attributes(1043 QueueUrl=queue_url,1044 AttributeNames=['All']1045 )1046 attributes = rs['Attributes']1047 self.assertIn('ContentBasedDeduplication', attributes)1048 self.assertIn('FifoQueue', attributes)1049 self.assertEqual(attributes['ContentBasedDeduplication'], 'false')1050 self.assertEqual(attributes['FifoQueue'], 'true')1051 cfn.delete_stack(StackName=stack_name)1052 with self.assertRaises(ClientError) as ctx:1053 sqs.get_queue_url(QueueName=fifo_queue)1054 self.assertEqual(ctx.exception.response['Error']['Code'], 'AWS.SimpleQueueService.NonExistentQueue')1055 def test_cfn_handle_events_rule(self):1056 stack_name = 'stack-%s' % short_uid()1057 bucket_name = 'target-%s' % short_uid()1058 rule_prefix = 's3-rule-%s' % short_uid()1059 rule_name = '%s-%s' % (rule_prefix, short_uid())1060 cfn = aws_stack.connect_to_service('cloudformation')1061 events = aws_stack.connect_to_service('events')1062 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_16 % (bucket_name, rule_name))1063 rs = events.list_rules(1064 NamePrefix=rule_prefix1065 )1066 self.assertIn(rule_name, [rule['Name'] for rule in rs['Rules']])1067 target_arn = aws_stack.s3_bucket_arn(bucket_name)1068 rs = events.list_targets_by_rule(1069 Rule=rule_name1070 )1071 self.assertIn(target_arn, [target['Arn'] for target in rs['Targets']])1072 cfn.delete_stack(StackName=stack_name)1073 rs = events.list_rules(1074 NamePrefix=rule_prefix1075 )1076 self.assertNotIn(rule_name, [rule['Name'] for rule in rs['Rules']])1077 def test_cfn_handle_events_rule_without_name(self):1078 events = aws_stack.connect_to_service('events')1079 rs = events.list_rules()1080 rule_names = [rule['Name'] for rule in rs['Rules']]1081 stack_name = 'stack-%s' % short_uid()1082 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_18 % aws_stack.role_arn('sfn_role'))1083 rs = events.list_rules()1084 new_rules = [rule for rule in rs['Rules'] if rule['Name'] not in rule_names]1085 self.assertEqual(len(new_rules), 1)1086 rule = new_rules[0]1087 self.assertEqual(rule['ScheduleExpression'], 'cron(0/1 * * * ? *)')1088 cfn = aws_stack.connect_to_service('cloudformation')1089 cfn.delete_stack(StackName=stack_name)1090 time.sleep(1)1091 rs = events.list_rules()1092 self.assertNotIn(rule['Name'], [r['Name'] for r in rs['Rules']])1093 def test_cfn_handle_s3_notification_configuration(self):1094 stack_name = 'stack-%s' % short_uid()1095 bucket_name = 'target-%s' % short_uid()1096 queue_name = 'queue-%s' % short_uid()1097 queue_arn = aws_stack.sqs_queue_arn(queue_name)1098 cfn = aws_stack.connect_to_service('cloudformation')1099 s3 = aws_stack.connect_to_service('s3')1100 deploy_cf_stack(1101 stack_name=stack_name, template_body=TEST_TEMPLATE_17 % (queue_name, bucket_name, queue_arn))1102 rs = s3.get_bucket_notification_configuration(1103 Bucket=bucket_name1104 )1105 self.assertIn('QueueConfigurations', rs)1106 self.assertEqual(len(rs['QueueConfigurations']), 1)1107 self.assertEqual(rs['QueueConfigurations'][0]['QueueArn'], queue_arn)1108 cfn.delete_stack(StackName=stack_name)1109 rs = s3.get_bucket_notification_configuration(1110 Bucket=bucket_name1111 )1112 self.assertNotIn('QueueConfigurations', rs)1113 def test_cfn_lambda_function_with_iam_role(self):1114 stack_name = 'stack-%s' % short_uid()1115 role_name = 'lambda-ex'1116 iam = aws_stack.connect_to_service('iam')1117 cloudformation = aws_stack.connect_to_service('cloudformation')1118 response = iam.create_role(1119 RoleName=role_name,1120 AssumeRolePolicyDocument='{"Version": "2012-10-17","Statement": [{ "Effect": "Allow", "Principal": {'1121 '"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole"}]} '1122 )1123 self.assertEqual(role_name, response['Role']['RoleName'])1124 response = iam.get_role(1125 RoleName=role_name1126 )1127 self.assertEqual(role_name, response['Role']['RoleName'])1128 role_arn = response['Role']['Arn']1129 response = cloudformation.create_stack(1130 StackName=stack_name,1131 TemplateBody=TEST_TEMPLATE_20 % role_arn,1132 )1133 self.assertEqual(200, response['ResponseMetadata']['HTTPStatusCode'])1134 # clean up1135 cloudformation.delete_stack(StackName=stack_name)1136 iam.delete_role(RoleName=role_name)1137 def test_cfn_handle_serverless_api_resource(self):1138 stack_name = 'stack-%s' % short_uid()1139 cloudformation = aws_stack.connect_to_service('cloudformation')1140 deploy_cf_stack(stack_name=stack_name, template_body=TEST_TEMPLATE_22)1141 res = cloudformation.list_stack_resources(StackName=stack_name)['StackResourceSummaries']1142 rest_api_ids = [r['PhysicalResourceId'] for r in res if r['ResourceType'] == 'AWS::ApiGateway::RestApi']1143 lambda_func_names = [r['PhysicalResourceId'] for r in res if r['ResourceType'] == 'AWS::Lambda::Function']1144 self.assertEqual(len(rest_api_ids), 1)1145 self.assertEqual(len(lambda_func_names), 1)1146 apigw_client = aws_stack.connect_to_service('apigateway')1147 rs = apigw_client.get_resources(1148 restApiId=rest_api_ids[0]1149 )1150 self.assertEqual(len(rs['items']), 1)1151 resource = rs['items'][0]1152 uri = resource['resourceMethods']['GET']['methodIntegration']['uri']1153 lambda_arn = aws_stack.lambda_function_arn(lambda_func_names[0])1154 self.assertIn(lambda_arn, uri)1155 # clean up1156 cloudformation.delete_stack(StackName=stack_name)1157 def test_delete_stack(self):1158 domain_name = 'es-%s' % short_uid()1159 cloudformation = aws_stack.connect_to_service('cloudformation')1160 cloudformation.create_stack(1161 StackName='myteststack',1162 TemplateBody=TEST_TEMPLATE_3,1163 Parameters=[{'ParameterKey': 'DomainName', 'ParameterValue': domain_name}]1164 )1165 cloudformation.create_stack(1166 StackName='myteststack2',1167 TemplateBody=TEST_TEMPLATE_3,1168 Parameters=[{'ParameterKey': 'DomainName', 'ParameterValue': domain_name}]1169 )1170 cloudformation.delete_stack(StackName='myteststack2')1171 cloudformation.delete_stack(StackName='myteststack')1172 def test_cfn_with_on_demand_dynamodb_resource(self):1173 cloudformation = aws_stack.connect_to_service('cloudformation')1174 response = cloudformation.create_stack(1175 StackName='myteststack',1176 TemplateBody=load_file(TEST_TEMPLATE_21))1177 self.assertIn('StackId', response)1178 self.assertEqual(200, response['ResponseMetadata']['HTTPStatusCode'])1179 cloudformation.delete_stack(StackName='myteststack')1180 def test_update_lambda_function(self):1181 bucket_name = 'bucket-{}'.format(short_uid())1182 key_name = 'lambda-package'1183 role_name = 'role-{}'.format(short_uid())1184 function_name = 'func-{}'.format(short_uid())1185 package_path = os.path.join(THIS_FOLDER, 'lambdas', 'lambda_echo.js')1186 stack_name = 'stack-{}'.format(short_uid())1187 cloudformation = aws_stack.connect_to_service('cloudformation')1188 template = json.loads(load_file(TEST_UPDATE_LAMBDA_FUNCTION_TEMPLATE))1189 template['Resources']['PullMarketsRole']['Properties']['RoleName'] = role_name1190 props = template['Resources']['SomeNameFunction']['Properties']1191 props['Code']['S3Bucket'] = bucket_name1192 props['Code']['S3Key'] = key_name1193 props['FunctionName'] = function_name1194 s3 = aws_stack.connect_to_service('s3')1195 s3.create_bucket(Bucket=bucket_name, ACL='public-read')1196 s3.put_object(Bucket=bucket_name, Key=key_name, Body=create_zip_file(package_path, True))1197 time.sleep(1)1198 rs = cloudformation.create_stack(1199 StackName=stack_name,1200 TemplateBody=json.dumps(template),1201 )1202 self.assertEqual(200, rs['ResponseMetadata']['HTTPStatusCode'])1203 props.update({1204 'Environment': {1205 'Variables': {1206 'AWS_NODEJS_CONNECTION_REUSE_ENABLED': 11207 }1208 }1209 })1210 rs = cloudformation.update_stack(1211 StackName=stack_name,1212 TemplateBody=json.dumps(template),1213 )1214 self.assertEqual(200, rs['ResponseMetadata']['HTTPStatusCode'])1215 lambda_client = aws_stack.connect_to_service('lambda')1216 rs = lambda_client.get_function(1217 FunctionName=function_name1218 )1219 self.assertEqual(rs['Configuration']['FunctionName'], function_name)1220 self.assertIn('AWS_NODEJS_CONNECTION_REUSE_ENABLED', rs['Configuration']['Environment']['Variables'])1221 # clean up1222 cloudformation.delete_stack(StackName=stack_name)1223 def test_cfn_deploy_apigateway_integration(self):1224 stack_name = 'stack-%s' % short_uid()1225 bucket_name = 'hofund-local-deployment'1226 key_name = 'serverless/hofund/local/1599143878432/authorizer.zip'1227 package_path = os.path.join(THIS_FOLDER, 'lambdas', 'lambda_echo.js')1228 template = template_deployer.template_to_json(load_file(APIGW_INTEGRATION_TEMPLATE))1229 s3 = aws_stack.connect_to_service('s3')1230 s3.create_bucket(Bucket=bucket_name, ACL='public-read')1231 s3.put_object(Bucket=bucket_name, Key=key_name, Body=create_zip_file(package_path, True))1232 time.sleep(1)1233 cloudformation = aws_stack.connect_to_service('cloudformation')1234 apigw_client = aws_stack.connect_to_service('apigateway')1235 rs = cloudformation.create_stack(StackName=stack_name, TemplateBody=template)1236 self.assertEqual(rs['ResponseMetadata']['HTTPStatusCode'], 200)1237 stack_resources = cloudformation.list_stack_resources(StackName=stack_name)['StackResourceSummaries']1238 rest_apis = [res for res in stack_resources if res['ResourceType'] == 'AWS::ApiGateway::RestApi']1239 rs = apigw_client.get_rest_api(restApiId=rest_apis[0]['PhysicalResourceId'])1240 self.assertEqual(rs['name'], 'ApiGatewayRestApi')1241 # clean up1242 cloudformation.delete_stack(StackName=stack_name)1243 def test_globalindex_read_write_provisioned_throughput_dynamodb_table(self):1244 cf_client = aws_stack.connect_to_service('cloudformation')1245 ddb_client = aws_stack.connect_to_service('dynamodb')1246 stack_name = 'test_dynamodb'1247 response = cf_client.create_stack(1248 StackName=stack_name,1249 TemplateBody=load_file(TEST_DEPLOY_BODY_3),1250 Parameters=[1251 {1252 'ParameterKey': 'tableName',1253 'ParameterValue': 'dynamodb'1254 },1255 {1256 'ParameterKey': 'env',1257 'ParameterValue': 'test'1258 }1259 ]1260 )1261 self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)1262 response = ddb_client.describe_table(1263 TableName='dynamodb-test'1264 )1265 if response['Table']['ProvisionedThroughput']:1266 throughput = response['Table']['ProvisionedThroughput']1267 self.assertTrue(isinstance(throughput['ReadCapacityUnits'], int))1268 self.assertTrue(isinstance(throughput['WriteCapacityUnits'], int))1269 for global_index in response['Table']['GlobalSecondaryIndexes']:1270 index_provisioned = global_index['ProvisionedThroughput']1271 test_read_capacity = index_provisioned['ReadCapacityUnits']1272 test_write_capacity = index_provisioned['WriteCapacityUnits']1273 self.assertTrue(isinstance(test_read_capacity, int))1274 self.assertTrue(isinstance(test_write_capacity, int))1275 cf_client.delete_stack(StackName=stack_name)1276 def test_delete_stack_across_regions(self):1277 domain_name = 'es-%s' % short_uid()1278 stack_name = 'stack-%s' % short_uid()1279 cloudformation = aws_stack.connect_to_service('cloudformation', region_name='eu-central-1')1280 cloudformation.create_stack(1281 StackName=stack_name,1282 TemplateBody=TEST_TEMPLATE_3,1283 Parameters=[{'ParameterKey': 'DomainName', 'ParameterValue': domain_name}]1284 )1285 resp = cloudformation.delete_stack(StackName=stack_name)1286 self.assertEqual(resp['ResponseMetadata']['HTTPStatusCode'], 200)1287 def test_update_stack_with_same_template(self):1288 stack_name = 'stack-%s' % short_uid()1289 template_data = load_file(SQS_TEMPLATE)1290 cloudformation = aws_stack.connect_to_service('cloudformation')1291 params = {1292 'StackName': stack_name,1293 'TemplateBody': template_data1294 }1295 cloudformation.create_stack(**params)1296 with self.assertRaises(Exception) as ctx:1297 cloudformation.update_stack(**params)1298 waiter = cloudformation.get_waiter('stack_update_complete')1299 waiter.wait(StackName=stack_name)1300 error_message = str(ctx.exception)1301 self.assertIn('UpdateStack', error_message)1302 self.assertIn('No updates are to be performed.', error_message)1303 # clean up1304 cloudformation.delete_stack(StackName=stack_name)1305 def test_cdk_template(self):1306 stack_name = 'stack-%s' % short_uid()1307 bucket = 'bucket-%s' % short_uid()1308 key = 'key-%s' % short_uid()1309 path = os.path.join(THIS_FOLDER, 'templates', 'asset')1310 s3_client = aws_stack.connect_to_service('s3')1311 s3_client.create_bucket(Bucket=bucket)1312 s3_client.put_object(Bucket=bucket, Key=key, Body=create_zip_file(path, True))1313 template = load_file(os.path.join(THIS_FOLDER, 'templates', 'cdktemplate.json'))1314 cloudformation = aws_stack.connect_to_service('cloudformation')1315 cloudformation.create_stack(1316 StackName=stack_name,1317 TemplateBody=template,1318 Parameters=[1319 {1320 'ParameterKey': 'AssetParameters1S3BucketEE4ED9A8',1321 'ParameterValue': bucket1322 },1323 {1324 'ParameterKey': 'AssetParameters1S3VersionKeyE160C88A',1325 'ParameterValue': key1326 }1327 ]1328 )1329 time.sleep(3)1330 lambda_client = aws_stack.connect_to_service('lambda')1331 resp = lambda_client.list_functions()1332 functions = [func for func in resp['Functions'] if stack_name in func['FunctionName']]1333 self.assertEqual(len(functions), 2)1334 self.assertEqual(len([func for func in functions if func['Handler'] == 'index.createUserHandler']), 1)1335 self.assertEqual(len([func for func in functions if func['Handler'] == 'index.authenticateUserHandler']), 1)1336 # clean up1337 cloudformation.delete_stack(StackName=stack_name)1338 def test_cfn_template_with_short_form_fn_sub(self):1339 stack_name = 'stack-%s' % short_uid()1340 environment = 'env-%s' % short_uid()1341 cloudformation = aws_stack.connect_to_service('cloudformation')1342 cloudformation.create_stack(1343 StackName=stack_name,1344 TemplateBody=load_file(TEST_TEMPLATE_23),1345 Parameters=[1346 {1347 'ParameterKey': 'Environment',1348 'ParameterValue': environment1349 },1350 {1351 'ParameterKey': 'ApiKey',1352 'ParameterValue': '12345'1353 }1354 ]1355 )1356 iam_client = aws_stack.connect_to_service('iam')1357 rs = iam_client.list_roles()1358 # 2 roles created successfully1359 roles = [role for role in rs['Roles']1360 if role['RoleName'] in ['cf-{}-Role'.format(stack_name),1361 'cf-{}-StateMachineExecutionRole'.format(stack_name)]]1362 self.assertEqual(len(roles), 2)1363 sfn_client = aws_stack.connect_to_service('stepfunctions')1364 state_machines_after = sfn_client.list_state_machines()['stateMachines']1365 state_machines = [sm for sm in state_machines_after if '{}-StateMachine-'.format(stack_name) in sm['name']]1366 self.assertEqual(len(state_machines), 1)1367 rs = sfn_client.describe_state_machine(stateMachineArn=state_machines[0]['stateMachineArn'])1368 definition = json.loads(rs['definition'].replace('\n', ''))1369 payload = definition['States']['time-series-update']['Parameters']['Payload']1370 self.assertEqual(payload, {'key': '12345'})1371 # clean up1372 cloudformation.delete_stack(StackName=stack_name)1373 def test_sub_in_lambda_function_name(self):1374 stack_name = 'stack-%s' % short_uid()1375 environment = 'env-%s' % short_uid()1376 bucket = 'bucket-%s' % short_uid()1377 key = 'key-%s' % short_uid()1378 package_path = os.path.join(THIS_FOLDER, 'lambdas', 'lambda_echo.js')1379 s3 = aws_stack.connect_to_service('s3')1380 s3.create_bucket(Bucket=bucket, ACL='public-read')1381 s3.put_object(Bucket=bucket, Key=key, Body=create_zip_file(package_path, True))1382 time.sleep(1)1383 template = load_file(TEST_TEMPLATE_24) % (bucket, key, bucket, key)1384 cloudformation = aws_stack.connect_to_service('cloudformation')1385 cloudformation.create_stack(1386 StackName=stack_name,1387 TemplateBody=template,1388 Parameters=[{1389 'ParameterKey': 'Environment',1390 'ParameterValue': environment1391 }]1392 )1393 await_stack_completion(stack_name)1394 lambda_client = aws_stack.connect_to_service('lambda')1395 functions = lambda_client.list_functions()['Functions']1396 # assert Lambda functions created with expected name and ARN1397 func_prefix = 'test-{}-connectionHandler'.format(environment)1398 functions = [func for func in functions if func['FunctionName'].startswith(func_prefix)]1399 self.assertEqual(len(functions), 2)1400 func1 = [f for f in functions if f['FunctionName'].endswith('connectionHandler1')][0]1401 func2 = [f for f in functions if f['FunctionName'].endswith('connectionHandler2')][0]1402 self.assertTrue(func1['FunctionArn'].endswith(func1['FunctionName']))1403 self.assertTrue(func2['FunctionArn'].endswith(func2['FunctionName']))1404 # assert buckets which reference Lambda names have been created1405 s3_client = aws_stack.connect_to_service('s3')1406 buckets = s3_client.list_buckets()['Buckets']1407 buckets = [b for b in buckets if b['Name'].startswith(func_prefix.lower())]1408 # assert buckets are created correctly1409 self.assertEqual(len(functions), 2)1410 tags1 = s3_client.get_bucket_tagging(Bucket=buckets[0]['Name'])1411 tags2 = s3_client.get_bucket_tagging(Bucket=buckets[1]['Name'])1412 # assert correct tags - they reference the function names and should equal the bucket names (lower case)1413 self.assertEqual(tags1['TagSet'][0]['Value'].lower(), buckets[0]['Name'])1414 self.assertEqual(tags2['TagSet'][0]['Value'].lower(), buckets[1]['Name'])1415 # clean up1416 cloudformation.delete_stack(StackName=stack_name)1417 def test_lambda_dependency(self):1418 cloudformation = aws_stack.connect_to_service('cloudformation')1419 lambda_client = aws_stack.connect_to_service('lambda')1420 stack_name = 'stack-%s' % short_uid()1421 template = load_file(TEST_TEMPLATE_25)1422 details = deploy_cf_stack(stack_name, template_body=template)1423 # assert Lambda function created properly1424 resp = lambda_client.list_functions()1425 func_name = 'test-forward-sns'1426 functions = [func for func in resp['Functions'] if func['FunctionName'] == func_name]1427 self.assertEqual(len(functions), 1)1428 # assert that stack outputs are returned properly1429 outputs = details.get('Outputs', [])1430 self.assertEqual(len(outputs), 1)1431 self.assertEqual(outputs[0]['ExportName'], 'FuncArnExportName123')1432 # clean up1433 cloudformation.delete_stack(StackName=stack_name)1434 def test_functions_in_output_export_name(self):1435 stack_name = 'stack-%s' % short_uid()1436 environment = 'env-%s' % short_uid()1437 template = load_file(os.path.join(THIS_FOLDER, 'templates', 'template26.yaml'))1438 cfn = aws_stack.connect_to_service('cloudformation')1439 cfn.create_stack(1440 StackName=stack_name,1441 TemplateBody=template,1442 Parameters=[1443 {1444 'ParameterKey': 'Environment',1445 'ParameterValue': environment1446 }1447 ]1448 )1449 await_stack_completion(stack_name)1450 resp = cfn.describe_stacks(StackName=stack_name)1451 stack_outputs = [stack['Outputs'] for stack in resp['Stacks'] if stack['StackName'] == stack_name]1452 self.assertEqual(len(stack_outputs), 1)1453 outputs = {o['OutputKey']: {'value': o['OutputValue'], 'export': o['ExportName']} for o in stack_outputs[0]}1454 self.assertIn('VpcId', outputs)1455 self.assertEqual(outputs['VpcId'].get('export'), '{}-vpc-id'.format(environment))1456 topic_arn = aws_stack.sns_topic_arn('{}-slack-sns-topic'.format(environment))1457 self.assertIn('TopicArn', outputs)1458 self.assertEqual(outputs['TopicArn'].get('export'), topic_arn)1459 # clean up...

Full Screen

Full Screen

test_enroll_club_to_tournament.py

Source:test_enroll_club_to_tournament.py Github

copy

Full Screen

...20 StackName=stack_name,21 TemplateBody=template22 )23 assert response['ResponseMetadata']['HTTPStatusCode'] == 20024 await_stack_completion(stack_name)25def await_until_get_coupon_by_club_id(club_id: str, retries: int = 3, sleep: int = 1) -> NoReturn:26 dynamodb_client = aws_stack.connect_to_service('dynamodb')27 for i in range(0, retries + 1):28 coupon = dynamodb_client.query(29 TableName='Coupons',30 Select='ALL_ATTRIBUTES',31 KeyConditionExpression='ClubId = :clubId',32 ExpressionAttributeValues={33 ':clubId': {'S': club_id}34 }35 )36 if coupon['Items'] and coupon['Items'][0]['ClubId']['S'] == club_id:37 return coupon38 time.sleep(sleep)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful