Best Python code snippet using localstack_python
cloudformation_starter.py
Source:cloudformation_starter.py
...685 return S3_Bucket_get_cfn_attribute_orig(self, attribute_name)686 S3_Bucket_get_cfn_attribute_orig = s3_models.FakeBucket.get_cfn_attribute687 s3_models.FakeBucket.get_cfn_attribute = S3_Bucket_get_cfn_attribute688 @classmethod689 def Bucket_update_from_cloudformation_json(cls,690 original_resource, new_resource_name, cloudformation_json, region_name):691 if cloudformation_json.get('BucketName') in [None, original_resource.name]:692 # TODO: apply other resource updates693 cloudformation_json['BucketName'] = original_resource.name694 return original_resource695 return Bucket_update_from_cf_json_orig(original_resource, new_resource_name, cloudformation_json, region_name)696 Bucket_update_from_cf_json_orig = s3_models.FakeBucket.update_from_cloudformation_json697 s3_models.FakeBucket.update_from_cloudformation_json = Bucket_update_from_cloudformation_json698 # Patch SQS physical_resource_id(..) method in moto699 @property700 def SQS_Queue_physical_resource_id(self):701 result = SQS_Queue_physical_resource_id_orig.fget(self)702 if '://' not in result:703 # convert ID to queue URL704 self._physical_resource_id = (getattr(self, '_physical_resource_id', None) or705 aws_stack.get_sqs_queue_url(result))706 return self._physical_resource_id707 return result708 SQS_Queue_physical_resource_id_orig = sqs_models.Queue.physical_resource_id709 sqs_models.Queue.physical_resource_id = SQS_Queue_physical_resource_id710 # Patch LogGroup get_cfn_attribute(..) method in moto711 def LogGroup_get_cfn_attribute(self, attribute_name):712 try:713 return LogGroup_get_cfn_attribute_orig(self, attribute_name)714 except Exception:715 if attribute_name == 'Arn':716 return aws_stack.log_group_arn(self.name)717 raise718 LogGroup_get_cfn_attribute_orig = getattr(cw_models.LogGroup, 'get_cfn_attribute', None)719 cw_models.LogGroup.get_cfn_attribute = LogGroup_get_cfn_attribute720 # Patch Lambda get_cfn_attribute(..) method in moto721 def Lambda_Function_get_cfn_attribute(self, attribute_name):722 try:723 if attribute_name == 'Arn':724 return self.function_arn725 return Lambda_Function_get_cfn_attribute_orig(self, attribute_name)726 except Exception:727 if attribute_name in ('Name', 'FunctionName'):728 return self.function_name729 raise730 Lambda_Function_get_cfn_attribute_orig = lambda_models.LambdaFunction.get_cfn_attribute731 lambda_models.LambdaFunction.get_cfn_attribute = Lambda_Function_get_cfn_attribute732 # Patch DynamoDB get_cfn_attribute(..) method in moto733 def DynamoDB_Table_get_cfn_attribute(self, attribute_name):734 try:735 if attribute_name == 'StreamArn':736 streams = aws_stack.connect_to_service('dynamodbstreams').list_streams(TableName=self.name)['Streams']737 return streams[0]['StreamArn'] if streams else None738 return DynamoDB_Table_get_cfn_attribute_orig(self, attribute_name)739 except Exception as e:740 LOG.warning('Unable to get attribute "%s" from resource %s: %s' % (attribute_name, type(self), e))741 raise742 DynamoDB_Table_get_cfn_attribute_orig = dynamodb_models.Table.get_cfn_attribute743 dynamodb_models.Table.get_cfn_attribute = DynamoDB_Table_get_cfn_attribute744 # Patch IAM get_cfn_attribute(..) method in moto745 def IAM_Role_get_cfn_attribute(self, attribute_name):746 try:747 return IAM_Role_get_cfn_attribute_orig(self, attribute_name)748 except Exception:749 if attribute_name == 'Arn':750 return aws_stack.role_arn(self.name)751 raise752 IAM_Role_get_cfn_attribute_orig = iam_models.Role.get_cfn_attribute753 iam_models.Role.get_cfn_attribute = IAM_Role_get_cfn_attribute754 # Patch IAM Role model755 # https://github.com/localstack/localstack/issues/925756 @property757 def IAM_Role_physical_resource_id(self):758 return self.name759 iam_models.Role.physical_resource_id = IAM_Role_physical_resource_id760 # Patch SNS Topic get_cfn_attribute(..) method in moto761 def SNS_Topic_get_cfn_attribute(self, attribute_name):762 result = SNS_Topic_get_cfn_attribute_orig(self, attribute_name)763 if attribute_name.lower() in ['arn', 'topicarn']:764 result = aws_stack.fix_account_id_in_arns(result)765 return result766 SNS_Topic_get_cfn_attribute_orig = sns_models.Topic.get_cfn_attribute767 sns_models.Topic.get_cfn_attribute = SNS_Topic_get_cfn_attribute768 # Patch create_from_cloudformation_json(..)769 # #2568 Cloudformation create-stack for SNS with yaml causes TypeError770 SNS_Topic_create_from_cloudformation_json_orig = sns_models.Topic.create_from_cloudformation_json771 def SNS_Topic_create_from_cloudformation_json(resource_name, cloudformation_json, region_name):772 properties = cloudformation_json['Properties']773 if properties.get('Subscription'):774 properties['Subscription'] = [subscription for subscription in properties['Subscription'] if subscription]775 result = SNS_Topic_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)776 # remove topic from backend, as it will be created by our mechanism with additional attributes (like tags)777 sns_models.sns_backends[region_name].topics.pop(result.arn)778 return result779 sns_models.Topic.create_from_cloudformation_json = SNS_Topic_create_from_cloudformation_json780 # Patch ES get_cfn_attribute(..) method781 def ES_get_cfn_attribute(self, attribute_name):782 if attribute_name in ['Arn', 'DomainArn']:783 return aws_stack.es_domain_arn(self.params.get('DomainName'))784 if attribute_name == 'DomainEndpoint':785 if not hasattr(self, '_domain_endpoint'):786 es_details = aws_stack.connect_to_service('es').describe_elasticsearch_domain(787 DomainName=self.params.get('DomainName'))788 self._domain_endpoint = es_details['DomainStatus']['Endpoint']789 return self._domain_endpoint790 raise UnformattedGetAttTemplateException()791 service_models.ElasticsearchDomain.get_cfn_attribute = ES_get_cfn_attribute792 # Patch Firehose get_cfn_attribute(..) method793 def Firehose_get_cfn_attribute(self, attribute_name):794 if attribute_name == 'Arn':795 return aws_stack.firehose_stream_arn(self.params.get('DeliveryStreamName'))796 raise UnformattedGetAttTemplateException()797 service_models.FirehoseDeliveryStream.get_cfn_attribute = Firehose_get_cfn_attribute798 # Patch LambdaFunction create_from_cloudformation_json(..) method in moto799 @classmethod800 def Lambda_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):801 resource_name = cloudformation_json.get('Properties', {}).get('FunctionName') or resource_name802 def _from_env(*args, **kwargs):803 result = Mock()804 result.api = Mock()805 result.api.get_adapter = (lambda *args, **kwargs: None)806 return result807 # Temporarily set a mock client, to prevent moto from talking to the Docker daemon808 import docker809 _from_env_orig = docker.from_env810 docker.from_env = _from_env811 try:812 result = Lambda_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)813 finally:814 docker.from_env = _from_env_orig815 return result816 Lambda_create_from_cloudformation_json_orig = lambda_models.LambdaFunction.create_from_cloudformation_json817 lambda_models.LambdaFunction.create_from_cloudformation_json = Lambda_create_from_cloudformation_json818 # Patch EventSourceMapping create_from_cloudformation_json(..) method in moto819 @classmethod820 def Mapping_create_from_cloudformation_json(cls, resource_name, cloudformation_json, region_name):821 props = cloudformation_json.get('Properties', {})822 func_name = props.get('FunctionName') or ''823 if ':lambda:' in func_name:824 props['FunctionName'] = aws_stack.lambda_function_name(func_name)825 try:826 return Mapping_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)827 except Exception:828 LOG.info('Unable to add Lambda event mapping for source ARN "%s" in moto backend (ignoring)' %829 props.get('EventSourceArn'))830 # return an empty dummy instance, to avoid downstream None value issues831 return service_models.BaseModel()832 Mapping_create_from_cloudformation_json_orig = lambda_models.EventSourceMapping.create_from_cloudformation_json833 lambda_models.EventSourceMapping.create_from_cloudformation_json = Mapping_create_from_cloudformation_json834 # Patch LambdaFunction update_from_cloudformation_json(..) method in moto835 @classmethod836 def Lambda_update_from_cloudformation_json(cls,837 original_resource, new_resource_name, cloudformation_json, region_name):838 resource_name = cloudformation_json.get('Properties', {}).get('FunctionName') or new_resource_name839 return Lambda_create_from_cloudformation_json_orig(resource_name, cloudformation_json, region_name)840 if not hasattr(lambda_models.LambdaFunction, 'update_from_cloudformation_json'):841 lambda_models.LambdaFunction.update_from_cloudformation_json = Lambda_update_from_cloudformation_json842 # Patch Role update_from_cloudformation_json(..) method843 @classmethod844 def Role_update_from_cloudformation_json(cls,845 original_resource, new_resource_name, cloudformation_json, region_name):846 props = cloudformation_json.get('Properties', {})847 original_resource.name = props.get('RoleName') or original_resource.name848 original_resource.assume_role_policy_document = props.get('AssumeRolePolicyDocument')849 return original_resource850 if not hasattr(iam_models.Role, 'update_from_cloudformation_json'):851 iam_models.Role.update_from_cloudformation_json = Role_update_from_cloudformation_json852 # patch ApiGateway Deployment deletion853 @staticmethod854 def depl_delete_from_cloudformation_json(resource_name, resource_json, region_name):855 properties = resource_json['Properties']856 LOG.info('TODO: apigateway.Deployment.delete_from_cloudformation_json %s' % properties)857 if not hasattr(apigw_models.Deployment, 'delete_from_cloudformation_json'):858 apigw_models.Deployment.delete_from_cloudformation_json = depl_delete_from_cloudformation_json...
service_models.py
Source:service_models.py
...129 def resource_id(self):130 """Return the logical resource ID of this resource (i.e., the ref. name within the stack's resources)."""131 return self.resource_json["LogicalResourceId"]132 @classmethod133 def update_from_cloudformation_json(134 cls, original_resource, new_resource_name, cloudformation_json, region_name135 ):136 props = cloudformation_json.get("Properties", {})137 for key, val in props.items():138 snake_key = camel_to_snake_case(key)139 lower_key = key.lower()140 for candidate in [key, lower_key, snake_key]:141 if hasattr(original_resource, candidate) or candidate == snake_key:142 setattr(original_resource, candidate, val)143 break144 return original_resource145 @classmethod146 def create_from_cloudformation_json(cls, resource_name, resource_json, region_name):147 return cls(...
models.py
Source:models.py
...23 state_machine = StateMachine(arn, name, definition=definition, role_arn=role_arn)24 stepfunction_backends[region_name].state_machines.append(state_machine)25 return state_machine26 @classmethod27 def update_from_cloudformation_json(cls, original_resource, new_resource_name, cloudformation_json, region_name):28 props = cloudformation_json.get('Properties', {})29 definition = props.get('DefinitionString')30 role_arn = props.get('RoleArn')31 state_machine = stepfunction_backends[region_name].update_state_machine(32 original_resource.arn, definition=definition, role_arn=role_arn,33 )34 return state_machine35 def update(self, **kwargs):36 for key, value in kwargs.items():37 if value is not None:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!