Best Python code snippet using localstack_python
cloudformation_api.py
Source:cloudformation_api.py
...661 ) # stack should not exist yet (TODO: check proper message)662 state = CloudFormationRegion.get()663 empty_stack_template = dict(template)664 empty_stack_template["Resources"] = {}665 req_params_copy = clone_stack_params(req_params)666 stack = Stack(req_params_copy, empty_stack_template)667 state.stacks[stack.stack_id] = stack668 stack.set_stack_status("REVIEW_IN_PROGRESS")669 elif change_set_type == "IMPORT":670 raise NotImplementedError() # TODO: implement importing resources671 else:672 msg = f"1 validation error detected: Value '{change_set_type}' at 'changeSetType' failed to satisfy constraint: Member must satisfy enum value set: [IMPORT, UPDATE, CREATE]"673 return error_response(msg, code=400, code_string="ValidationError")674 change_set = StackChangeSet(req_params, template)675 # TODO: refactor the flow here676 deployer = template_deployer.TemplateDeployer(change_set)677 deployer.construct_changes(678 stack,679 change_set,680 change_set_id=change_set.change_set_id,681 append_to_changeset=True,682 ) # TODO: ignores return value (?)683 deployer.apply_parameter_changes(change_set, change_set) # TODO: bandaid to populate metadata684 stack.change_sets.append(change_set)685 change_set.metadata[686 "Status"687 ] = "CREATE_COMPLETE" # technically for some time this should first be CREATE_PENDING688 change_set.metadata[689 "ExecutionStatus"690 ] = "AVAILABLE" # technically for some time this should first be UNAVAILABLE691 return {"StackId": change_set.stack_id, "Id": change_set.change_set_id}692def execute_change_set(req_params):693 stack_name = req_params.get("StackName")694 cs_name = req_params.get("ChangeSetName")695 change_set = find_change_set(cs_name, stack_name=stack_name)696 if not change_set:697 return not_found_error(698 'Unable to find change set "%s" for stack "%s"' % (cs_name, stack_name)699 )700 LOG.debug(701 'Executing change set "%s" for stack "%s" with %s resources ...'702 % (cs_name, stack_name, len(change_set.template_resources))703 )704 deployer = template_deployer.TemplateDeployer(change_set.stack)705 deployer.apply_change_set(change_set)706 change_set.stack.metadata["ChangeSetId"] = change_set.change_set_id707 return {}708def list_change_sets(req_params):709 stack_name = req_params.get("StackName")710 stack = find_stack(stack_name)711 if not stack:712 return not_found_error('Unable to find stack "%s"' % stack_name)713 result = [cs.metadata for cs in stack.change_sets]714 result = {"Summaries": result}715 return result716def list_stack_sets(req_params):717 state = CloudFormationRegion.get()718 result = [sset.metadata for sset in state.stack_sets.values()]719 result = {"Summaries": result}720 return result721def describe_change_set(req_params):722 stack_name = req_params.get("StackName")723 cs_name = req_params.get("ChangeSetName")724 change_set: Optional[StackChangeSet] = find_change_set(cs_name, stack_name=stack_name)725 if not change_set:726 return not_found_error(727 'Unable to find change set "%s" for stack "%s"' % (cs_name, stack_name)728 )729 return change_set.metadata730def describe_stack_set(req_params):731 state = CloudFormationRegion.get()732 set_name = req_params.get("StackSetName")733 result = [734 sset.metadata for sset in state.stack_sets.values() if sset.stack_set_name == set_name735 ]736 if not result:737 return not_found_error('Unable to find stack set "%s"' % set_name)738 result = {"StackSet": result[0]}739 return result740def describe_stack_set_operation(req_params):741 state = CloudFormationRegion.get()742 set_name = req_params.get("StackSetName")743 stack_set = [sset for sset in state.stack_sets.values() if sset.stack_set_name == set_name]744 if not stack_set:745 return not_found_error('Unable to find stack set "%s"' % set_name)746 stack_set = stack_set[0]747 op_id = req_params.get("OperationId")748 result = stack_set.operations.get(op_id)749 if not result:750 LOG.debug(751 'Unable to find operation ID "%s" for stack set "%s" in list: %s'752 % (op_id, set_name, list(stack_set.operations.keys()))753 )754 return not_found_error(755 'Unable to find operation ID "%s" for stack set "%s"' % (op_id, set_name)756 )757 result = {"StackSetOperation": result}758 return result759def list_exports(req_params):760 state = CloudFormationRegion.get()761 result = {"Exports": state.exports}762 return result763def list_imports(req_params):764 state = CloudFormationRegion.get()765 export_name = req_params.get("ExportName")766 importing_stack_names = []767 for stack in state.stacks.values():768 if export_name in stack.imports:769 importing_stack_names.append(stack.stack_name)770 result = {"Imports": importing_stack_names}771 return result772def validate_template(req_params):773 try:774 result = template_preparer.validate_template(req_params)775 result = "<tmp>%s</tmp>" % result776 result = xmltodict.parse(result)["tmp"]777 return result778 except Exception as err:779 return error_response("Template Validation Error: %s" % err)780def describe_stack_events(req_params):781 stack_name = req_params.get("StackName")782 state = CloudFormationRegion.get()783 events = []784 for stack_id, stack in state.stacks.items():785 if stack_name in [None, stack.stack_name, stack.stack_id]:786 events.extend(stack.events)787 return {"StackEvents": events}788def delete_change_set(req_params):789 stack_name = req_params.get("StackName")790 cs_name = req_params.get("ChangeSetName")791 change_set = find_change_set(cs_name, stack_name=stack_name)792 if not change_set:793 return not_found_error(794 'Unable to find change set "%s" for stack "%s"' % (cs_name, stack_name)795 )796 change_set.stack.change_sets = [797 cs for cs in change_set.stack.change_sets if cs.change_set_name != cs_name798 ]799 return {}800def get_template(req_params):801 stack_name = req_params.get("StackName")802 cs_name = req_params.get("ChangeSetName")803 stack = find_stack(stack_name)804 if cs_name:805 stack = find_change_set(stack_name, cs_name)806 if not stack:807 return stack_not_found_error(stack_name)808 result = {"TemplateBody": json.dumps(stack._template_raw)}809 return result810def get_template_summary(req_params):811 stack_name = req_params.get("StackName")812 stack = None813 if stack_name:814 stack = find_stack(stack_name)815 if not stack:816 return stack_not_found_error(stack_name)817 else:818 template_deployer.prepare_template_body(req_params)819 template = template_preparer.parse_template(req_params["TemplateBody"])820 req_params["StackName"] = "tmp-stack"821 stack = Stack(req_params, template)822 result = stack.describe_details()823 id_summaries = {}824 for resource_id, resource in stack.template_resources.items():825 res_type = resource["Type"]826 id_summaries[res_type] = id_summaries.get(res_type) or []827 id_summaries[res_type].append(resource_id)828 result["ResourceTypes"] = list(id_summaries.keys())829 result["ResourceIdentifierSummaries"] = [830 {"ResourceType": key, "LogicalResourceIds": values} for key, values in id_summaries.items()831 ]832 return result833# -----------------834# MAIN ENTRY POINT835# -----------------836def serve(port: int, quiet: bool = True):837 from localstack.services import generic_proxy # moved here to fix circular import errors838 return generic_proxy.serve_flask_app(app=app, port=port)839@app.route("/", methods=["POST"])840def handle_request():841 data = request.get_data()842 req_params = parse_request_data(request.method, request.path, data)843 action = req_params.get("Action", "")844 func = ENDPOINTS.get(action)845 if not func:846 return "", 404847 result = func(req_params)848 result = _response(action, result)849 return result850ENDPOINTS = {851 "CreateChangeSet": create_change_set,852 "CreateStack": create_stack,853 "CreateStackInstances": create_stack_instances,854 "CreateStackSet": create_stack_set,855 "DeleteChangeSet": delete_change_set,856 "DeleteStack": delete_stack,857 "DeleteStackSet": delete_stack_set,858 "DescribeChangeSet": describe_change_set,859 "DescribeStackEvents": describe_stack_events,860 "DescribeStackResource": describe_stack_resource,861 "DescribeStackResources": describe_stack_resources,862 "DescribeStacks": describe_stacks,863 "DescribeStackSet": describe_stack_set,864 "DescribeStackSetOperation": describe_stack_set_operation,865 "ExecuteChangeSet": execute_change_set,866 "GetTemplate": get_template,867 "GetTemplateSummary": get_template_summary,868 "ListChangeSets": list_change_sets,869 "ListExports": list_exports,870 "ListImports": list_imports,871 "ListStackInstances": list_stack_instances,872 "ListStacks": list_stacks,873 "ListStackResources": list_stack_resources,874 "ListStackSets": list_stack_sets,875 "UpdateStack": update_stack,876 "UpdateStackSet": update_stack_set,877 "ValidateTemplate": validate_template,878}879# ---------------880# UTIL FUNCTIONS881# ---------------882@overload883def error_response(msg: str, code: int, code_string: str, xmlns: str = XMLNS_CF):884 ...885def error_response(*args, **kwargs):886 kwargs["xmlns"] = kwargs.get("xmlns") or XMLNS_CF887 return flask_error_response_xml(*args, **kwargs)888def not_found_error(message: str, error_type: str = None):889 error_type = error_type or "ResourceNotFoundException"890 return error_response(message, code=404, code_string=error_type)891def stack_not_found_error(stack_name: str, error_type: str = None):892 error_type = error_type or "ValidationError"893 return not_found_error("Stack with id %s does not exist" % stack_name, error_type=error_type)894def clone_stack_params(stack_params):895 try:896 return clone(stack_params)897 except Exception as e:898 LOG.info("Unable to clone stack parameters: %s" % e)899 return stack_params900def find_stack(stack_name: str) -> Optional[Stack]:901 state = CloudFormationRegion.get()902 return (903 [s for s in state.stacks.values() if stack_name in [s.stack_name, s.stack_id]] or [None]904 )[0]905def find_change_set(cs_name: str, stack_name: Optional[str] = None) -> Optional[StackChangeSet]:906 state = CloudFormationRegion.get()907 stack = find_stack(stack_name)908 stacks = [stack] if stack else state.stacks.values()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!