Best Python code snippet using localstack_python
cloudformation_api.py
Source:cloudformation_api.py
...577 stack_name = req_params.get("StackName")578 resource_id = req_params.get("LogicalResourceId")579 stack = find_stack(stack_name)580 if not stack:581 return stack_not_found_error(stack_name)582 details = stack.resource_status(resource_id)583 result = {"StackResourceDetail": details}584 return result585def describe_stack_resources(req_params):586 stack_name = req_params.get("StackName")587 resource_id = req_params.get("LogicalResourceId")588 phys_resource_id = req_params.get("PhysicalResourceId")589 if phys_resource_id and stack_name:590 return error_response("Cannot specify both StackName and PhysicalResourceId", code=400)591 # TODO: filter stack by PhysicalResourceId!592 stack = find_stack(stack_name)593 if not stack:594 return stack_not_found_error(stack_name)595 statuses = [596 res_status597 for res_id, res_status in stack.resource_states.items()598 if resource_id in [res_id, None]599 ]600 return {"StackResources": statuses}601def list_stack_resources(req_params):602 result = describe_stack_resources(req_params)603 if not isinstance(result, dict):604 return result605 result = {"StackResourceSummaries": result.pop("StackResources")}606 return result607def list_stack_instances(req_params):608 state = CloudFormationRegion.get()609 set_name = req_params.get("StackSetName")610 stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]611 if not stack_set:612 return not_found_error('Stack set named "%s" does not exist' % set_name)613 stack_set = stack_set[0]614 result = [inst.metadata for inst in stack_set.stack_instances]615 result = {"Summaries": result}616 return result617ChangeSetTypes = Literal["CREATE", "UPDATE", "IMPORT"]618def create_change_set(req_params: Dict[str, Any]):619 change_set_type: ChangeSetTypes = req_params.get("ChangeSetType", "UPDATE")620 stack_name: Optional[str] = req_params.get("StackName")621 change_set_name: Optional[str] = req_params.get("ChangeSetName")622 template_body: Optional[str] = req_params.get("TemplateBody")623 # s3 or secretsmanager url624 template_url: Optional[str] = req_params.get("TemplateUrl") or req_params.get("TemplateURL")625 if is_none_or_empty(change_set_name):626 return error_response(627 "ChangeSetName required", 400, "ValidationError"628 ) # TODO: check proper message629 if is_none_or_empty(stack_name):630 return error_response(631 "StackName required", 400, "ValidationError"632 ) # TODO: check proper message633 stack: Optional[Stack] = find_stack(stack_name)634 # validate and resolve template635 if template_body and template_url:636 return error_response(637 "Specify exactly one of 'TemplateBody' or 'TemplateUrl'", 400, "ValidationError"638 ) # TODO: check proper message639 if not template_body and not template_url:640 return error_response(641 "Specify exactly one of 'TemplateBody' or 'TemplateUrl'", 400, "ValidationError"642 ) # TODO: check proper message643 prepare_template_body(req_params) # TODO: function has too many unclear responsibilities644 template = template_preparer.parse_template(req_params["TemplateBody"])645 del req_params["TemplateBody"] # TODO: stop mutating req_params646 template["StackName"] = stack_name647 template[648 "ChangeSetName"649 ] = change_set_name # TODO: validate with AWS what this is actually doing?650 if change_set_type == "UPDATE":651 # add changeset to existing stack652 if stack is None:653 return error_response(654 f"Stack '{stack_name}' does not exist.", 400, "ValidationError"655 ) # stack should exist already656 elif change_set_type == "CREATE":657 # create new (empty) stack658 if stack is not None:659 return error_response(660 f"Stack {stack_name} already exists", 400, "ValidationError"661 ) # stack should not exist yet (TODO: check proper message)662 state = CloudFormationRegion.get()663 empty_stack_template = dict(template)664 empty_stack_template["Resources"] = {}665 req_params_copy = clone_stack_params(req_params)666 stack = Stack(req_params_copy, empty_stack_template)667 state.stacks[stack.stack_id] = stack668 stack.set_stack_status("REVIEW_IN_PROGRESS")669 elif change_set_type == "IMPORT":670 raise NotImplementedError() # TODO: implement importing resources671 else:672 msg = f"1 validation error detected: Value '{change_set_type}' at 'changeSetType' failed to satisfy constraint: Member must satisfy enum value set: [IMPORT, UPDATE, CREATE]"673 return error_response(msg, code=400, code_string="ValidationError")674 change_set = StackChangeSet(req_params, template)675 # TODO: refactor the flow here676 deployer = template_deployer.TemplateDeployer(change_set)677 deployer.construct_changes(678 stack,679 change_set,680 change_set_id=change_set.change_set_id,681 append_to_changeset=True,682 ) # TODO: ignores return value (?)683 deployer.apply_parameter_changes(change_set, change_set) # TODO: bandaid to populate metadata684 stack.change_sets.append(change_set)685 change_set.metadata[686 "Status"687 ] = "CREATE_COMPLETE" # technically for some time this should first be CREATE_PENDING688 change_set.metadata[689 "ExecutionStatus"690 ] = "AVAILABLE" # technically for some time this should first be UNAVAILABLE691 return {"StackId": change_set.stack_id, "Id": change_set.change_set_id}692def execute_change_set(req_params):693 stack_name = req_params.get("StackName")694 cs_name = req_params.get("ChangeSetName")695 change_set = find_change_set(cs_name, stack_name=stack_name)696 if not change_set:697 return not_found_error(698 'Unable to find change set "%s" for stack "%s"' % (cs_name, stack_name)699 )700 LOG.debug(701 'Executing change set "%s" for stack "%s" with %s resources ...'702 % (cs_name, stack_name, len(change_set.template_resources))703 )704 deployer = template_deployer.TemplateDeployer(change_set.stack)705 deployer.apply_change_set(change_set)706 change_set.stack.metadata["ChangeSetId"] = change_set.change_set_id707 return {}708def list_change_sets(req_params):709 stack_name = req_params.get("StackName")710 stack = find_stack(stack_name)711 if not stack:712 return not_found_error('Unable to find stack "%s"' % stack_name)713 result = [cs.metadata for cs in stack.change_sets]714 result = {"Summaries": result}715 return result716def list_stack_sets(req_params):717 state = CloudFormationRegion.get()718 result = [sset.metadata for sset in state.stack_sets.values()]719 result = {"Summaries": result}720 return result721def describe_change_set(req_params):722 stack_name = req_params.get("StackName")723 cs_name = req_params.get("ChangeSetName")724 change_set: Optional[StackChangeSet] = find_change_set(cs_name, stack_name=stack_name)725 if not change_set:726 return not_found_error(727 'Unable to find change set "%s" for stack "%s"' % (cs_name, stack_name)728 )729 return change_set.metadata730def describe_stack_set(req_params):731 state = CloudFormationRegion.get()732 set_name = req_params.get("StackSetName")733 result = [734 sset.metadata for sset in state.stack_sets.values() if sset.stack_set_name == set_name735 ]736 if not result:737 return not_found_error('Unable to find stack set "%s"' % set_name)738 result = {"StackSet": result[0]}739 return result740def describe_stack_set_operation(req_params):741 state = CloudFormationRegion.get()742 set_name = req_params.get("StackSetName")743 stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]744 if not stack_set:745 return not_found_error('Unable to find stack set "%s"' % set_name)746 stack_set = stack_set[0]747 op_id = req_params.get("OperationId")748 result = stack_set.operations.get(op_id)749 if not result:750 LOG.debug(751 'Unable to find operation ID "%s" for stack set "%s" in list: %s'752 % (op_id, set_name, list(stack_set.operations.keys()))753 )754 return not_found_error(755 'Unable to find operation ID "%s" for stack set "%s"' % (op_id, set_name)756 )757 result = {"StackSetOperation": result}758 return result759def list_exports(req_params):760 state = CloudFormationRegion.get()761 result = {"Exports": state.exports}762 return result763def list_imports(req_params):764 state = CloudFormationRegion.get()765 export_name = req_params.get("ExportName")766 importing_stack_names = []767 for stack in state.stacks.values():768 if export_name in stack.imports:769 importing_stack_names.append(stack.stack_name)770 result = {"Imports": importing_stack_names}771 return result772def validate_template(req_params):773 try:774 result = template_preparer.validate_template(req_params)775 result = "<tmp>%s</tmp>" % result776 result = xmltodict.parse(result)["tmp"]777 return result778 except Exception as err:779 return error_response("Template Validation Error: %s" % err)780def describe_stack_events(req_params):781 stack_name = req_params.get("StackName")782 state = CloudFormationRegion.get()783 events = []784 for stack_id, stack in state.stacks.items():785 if stack_name in [None, stack.stack_name, stack.stack_id]:786 events.extend(stack.events)787 return {"StackEvents": events}788def delete_change_set(req_params):789 stack_name = req_params.get("StackName")790 cs_name = req_params.get("ChangeSetName")791 change_set = find_change_set(cs_name, stack_name=stack_name)792 if not change_set:793 return not_found_error(794 'Unable to find change set "%s" for stack "%s"' % (cs_name, stack_name)795 )796 change_set.stack.change_sets = [797 cs for cs in change_set.stack.change_sets if cs.change_set_name != cs_name798 ]799 return {}800def get_template(req_params):801 stack_name = req_params.get("StackName")802 cs_name = req_params.get("ChangeSetName")803 stack = find_stack(stack_name)804 if cs_name:805 stack = find_change_set(stack_name, cs_name)806 if not stack:807 return stack_not_found_error(stack_name)808 result = {"TemplateBody": json.dumps(stack._template_raw)}809 return result810def get_template_summary(req_params):811 stack_name = req_params.get("StackName")812 stack = None813 if stack_name:814 stack = find_stack(stack_name)815 if not stack:816 return stack_not_found_error(stack_name)817 else:818 template_deployer.prepare_template_body(req_params)819 template = template_preparer.parse_template(req_params["TemplateBody"])820 req_params["StackName"] = "tmp-stack"821 stack = Stack(req_params, template)822 result = stack.describe_details()823 id_summaries = {}824 for resource_id, resource in stack.template_resources.items():825 res_type = resource["Type"]826 id_summaries[res_type] = id_summaries.get(res_type) or []827 id_summaries[res_type].append(resource_id)828 result["ResourceTypes"] = list(id_summaries.keys())829 result["ResourceIdentifierSummaries"] = [830 {"ResourceType": key, "LogicalResourceIds": values} for key, values in id_summaries.items()831 ]832 return result833# -----------------834# MAIN ENTRY POINT835# -----------------836def serve(port: int, quiet: bool = True):837 from localstack.services import generic_proxy # moved here to fix circular import errors838 return generic_proxy.serve_flask_app(app=app, port=port)839@app.route("/", methods=["POST"])840def handle_request():841 data = request.get_data()842 req_params = parse_request_data(request.method, request.path, data)843 action = req_params.get("Action", "")844 func = ENDPOINTS.get(action)845 if not func:846 return "", 404847 result = func(req_params)848 result = _response(action, result)849 return result850ENDPOINTS = {851 "CreateChangeSet": create_change_set,852 "CreateStack": create_stack,853 "CreateStackInstances": create_stack_instances,854 "CreateStackSet": create_stack_set,855 "DeleteChangeSet": delete_change_set,856 "DeleteStack": delete_stack,857 "DeleteStackSet": delete_stack_set,858 "DescribeChangeSet": describe_change_set,859 "DescribeStackEvents": describe_stack_events,860 "DescribeStackResource": describe_stack_resource,861 "DescribeStackResources": describe_stack_resources,862 "DescribeStacks": describe_stacks,863 "DescribeStackSet": describe_stack_set,864 "DescribeStackSetOperation": describe_stack_set_operation,865 "ExecuteChangeSet": execute_change_set,866 "GetTemplate": get_template,867 "GetTemplateSummary": get_template_summary,868 "ListChangeSets": list_change_sets,869 "ListExports": list_exports,870 "ListImports": list_imports,871 "ListStackInstances": list_stack_instances,872 "ListStacks": list_stacks,873 "ListStackResources": list_stack_resources,874 "ListStackSets": list_stack_sets,875 "UpdateStack": update_stack,876 "UpdateStackSet": update_stack_set,877 "ValidateTemplate": validate_template,878}879# ---------------880# UTIL FUNCTIONS881# ---------------882@overload883def error_response(msg: str, code: int, code_string: str, xmlns: str = XMLNS_CF):884 ...885def error_response(*args, **kwargs):886 kwargs["xmlns"] = kwargs.get("xmlns") or XMLNS_CF887 return flask_error_response_xml(*args, **kwargs)888def not_found_error(message: str, error_type: str = None):889 error_type = error_type or "ResourceNotFoundException"890 return error_response(message, code=404, code_string=error_type)891def stack_not_found_error(stack_name: str, error_type: str = None):892 error_type = error_type or "ValidationError"893 return not_found_error("Stack with id %s does not exist" % stack_name, error_type=error_type)894def clone_stack_params(stack_params):895 try:896 return clone(stack_params)897 except Exception as e:898 LOG.info("Unable to clone stack parameters: %s" % e)899 return stack_params900def find_stack(stack_name: str) -> Optional[Stack]:901 state = CloudFormationRegion.get()902 return (903 [s for s in state.stacks.values() if stack_name in [s.stack_name, s.stack_id]] or [None]904 )[0]905def find_change_set(cs_name: str, stack_name: Optional[str] = None) -> Optional[StackChangeSet]:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!