Best Python code snippet using localstack_python
template_deployer.py
Source:template_deployer.py
...827 "Error calling %s with params: %s for resource: %s" % (function, params, resource)828 )829 raise e830 return result831def get_action_name_for_resource_change(res_change):832 return {"Add": "CREATE", "Remove": "DELETE", "Modify": "UPDATE"}.get(res_change)833# TODO: this shouldn't be called for stack parameters834def determine_resource_physical_id(835 resource_id, resources=None, stack=None, attribute=None, stack_name=None836):837 resources = resources or stack.resources838 stack_name = stack_name or stack.stack_name839 resource = resources.get(resource_id, {})840 if not resource:841 return842 resource_type = get_resource_type(resource)843 resource_type = re.sub("^AWS::", "", resource_type)844 resource_props = resource.get("Properties", {})845 # determine result from resource class846 canonical_type = canonical_resource_type(resource_type)847 resource_class = RESOURCE_MODELS.get(canonical_type)848 if resource_class:849 resource_inst = resource_class(resource)850 resource_inst.fetch_state_if_missing(stack_name=stack_name, resources=resources)851 result = resource_inst.get_physical_resource_id(attribute=attribute)852 if result:853 return result854 # TODO: put logic into resource-specific model classes!855 if resource_type == "ApiGateway::RestApi":856 result = resource_props.get("id")857 if result:858 return result859 elif resource_type == "ApiGateway::Stage":860 return resource_props.get("StageName")861 elif resource_type == "AppSync::DataSource":862 return resource_props.get("DataSourceArn")863 elif resource_type == "KinesisFirehose::DeliveryStream":864 return aws_stack.firehose_stream_arn(resource_props.get("DeliveryStreamName"))865 elif resource_type == "StepFunctions::StateMachine":866 return aws_stack.state_machine_arn(867 resource_props.get("StateMachineName")868 ) # returns ARN in AWS869 elif resource_type == "S3::Bucket":870 if attribute == "Arn":871 return aws_stack.s3_bucket_arn(resource_props.get("BucketName"))872 return resource_props.get("BucketName") # Note: "Ref" returns bucket name in AWS873 elif resource_type == "IAM::Role":874 if attribute == "Arn":875 return aws_stack.role_arn(resource_props.get("RoleName"))876 return resource_props.get("RoleName")877 elif resource_type == "SecretsManager::Secret":878 arn = get_secret_arn(resource_props.get("Name")) or ""879 if attribute == "Arn":880 return arn881 return arn.split(":")[-1]882 elif resource_type == "IAM::Policy":883 if attribute == "Arn":884 return aws_stack.policy_arn(resource_props.get("PolicyName"))885 return resource_props.get("PolicyName")886 elif resource_type == "DynamoDB::Table":887 table_name = resource_props.get("TableName")888 if table_name:889 if attribute == "Ref":890 return table_name # Note: "Ref" returns table name in AWS891 return table_name892 elif resource_type == "Logs::LogGroup":893 return resource_props.get("LogGroupName")894 res_id = resource.get("PhysicalResourceId")895 if res_id and attribute in [None, "Ref", "PhysicalResourceId"]:896 return res_id897 result = extract_resource_attribute(898 resource_type,899 {},900 attribute or "PhysicalResourceId",901 stack_name=stack_name,902 resource_id=resource_id,903 resource=resource,904 resources=resources,905 )906 if result is not None:907 # note that value could be an empty string here (in case of Parameter values)908 return result909 LOG.info(910 'Unable to determine PhysicalResourceId for "%s" resource, ID "%s"'911 % (resource_type, resource_id)912 )913def update_resource_details(stack, resource_id, details, action=None):914 resource = stack.resources.get(resource_id, {})915 if not resource or not details:916 return917 # TODO: we need to rethink this method - this should be encapsulated in the resource model classes.918 # Also, instead of actively updating the PhysicalResourceId attributes below, they should be919 # determined and returned by the resource model classes upon request.920 resource_type = resource.get("Type") or ""921 resource_type = re.sub("^AWS::", "", resource_type)922 resource_props = resource.get("Properties", {})923 if resource_type == "ApiGateway::RestApi":924 resource_props["id"] = details["id"]925 if resource_type == "KMS::Key":926 resource["PhysicalResourceId"] = details["KeyMetadata"]["KeyId"]927 if resource_type == "EC2::Instance":928 if details and isinstance(details, list) and hasattr(details[0], "id"):929 resource["PhysicalResourceId"] = details[0].id930 if isinstance(details, dict) and details.get("InstanceId"):931 resource["PhysicalResourceId"] = details["InstanceId"]932 if resource_type == "EC2::SecurityGroup":933 resource["PhysicalResourceId"] = details["GroupId"]934 if resource_type == "IAM::InstanceProfile":935 resource["PhysicalResourceId"] = details["InstanceProfile"]["InstanceProfileName"]936 if resource_type == "StepFunctions::Activity":937 resource["PhysicalResourceId"] = details["activityArn"]938 if resource_type == "ApiGateway::Model":939 resource["PhysicalResourceId"] = details["id"]940 if resource_type == "EC2::VPC":941 resource["PhysicalResourceId"] = details["Vpc"]["VpcId"]942 if resource_type == "EC2::Subnet":943 resource["PhysicalResourceId"] = details["Subnet"]["SubnetId"]944 if resource_type == "EC2::RouteTable":945 resource["PhysicalResourceId"] = details["RouteTable"]["RouteTableId"]946 if resource_type == "EC2::Route":947 resource["PhysicalResourceId"] = generate_route_id(948 resource_props["RouteTableId"],949 resource_props.get("DestinationCidrBlock", ""),950 resource_props.get("DestinationIpv6CidrBlock"),951 )952 # TODO remove!953 if isinstance(details, MotoCloudFormationModel):954 # fallback: keep track of moto resource status955 stack.moto_resource_statuses[resource_id] = details956def add_default_resource_props(957 resource,958 stack_name,959 resource_name=None,960 resource_id=None,961 update=False,962 existing_resources=None,963):964 """Apply some fixes to resource props which otherwise cause deployments to fail"""965 res_type = resource["Type"]966 canonical_type = canonical_resource_type(res_type)967 resource_class = RESOURCE_MODELS.get(canonical_type)968 if resource_class is not None:969 resource_class.add_defaults(resource, stack_name)970# -----------------------971# MAIN TEMPLATE DEPLOYER972# -----------------------973class TemplateDeployer(object):974 def __init__(self, stack):975 self.stack = stack976 @property977 def resources(self):978 return self.stack.resources979 @property980 def stack_name(self):981 return self.stack.stack_name982 # ------------------983 # MAIN ENTRY POINTS984 # ------------------985 def deploy_stack(self):986 self.stack.set_stack_status("CREATE_IN_PROGRESS")987 try:988 self.apply_changes(989 self.stack,990 self.stack,991 stack_name=self.stack.stack_name,992 initialize=True,993 action="CREATE",994 )995 except Exception as e:996 LOG.info("Unable to create stack %s: %s" % (self.stack.stack_name, e))997 self.stack.set_stack_status("CREATE_FAILED")998 raise999 def apply_change_set(self, change_set):1000 action = "CREATE"1001 change_set.stack.set_stack_status("%s_IN_PROGRESS" % action)1002 try:1003 self.apply_changes(1004 change_set.stack,1005 change_set,1006 stack_name=change_set.stack_name,1007 action=action,1008 )1009 except Exception as e:1010 LOG.info(1011 "Unable to apply change set %s: %s" % (change_set.metadata.get("ChangeSetName"), e)1012 )1013 change_set.metadata["Status"] = "%s_FAILED" % action1014 self.stack.set_stack_status("%s_FAILED" % action)1015 raise1016 def update_stack(self, new_stack):1017 self.stack.set_stack_status("UPDATE_IN_PROGRESS")1018 # apply changes1019 self.apply_changes(self.stack, new_stack, stack_name=self.stack.stack_name, action="UPDATE")1020 def delete_stack(self):1021 if not self.stack:1022 return1023 self.stack.set_stack_status("DELETE_IN_PROGRESS")1024 stack_resources = list(self.stack.resources.values())1025 stack_name = self.stack.stack_name1026 resources = dict([(r["LogicalResourceId"], common.clone_safe(r)) for r in stack_resources])1027 for key, resource in resources.items():1028 resource["Properties"] = resource.get("Properties", common.clone_safe(resource))1029 resource["ResourceType"] = resource.get("ResourceType") or resource.get("Type")1030 for resource_id, resource in resources.items():1031 # TODO: cache condition value in resource details on deployment and use cached value here1032 if evaluate_resource_condition(resource, stack_name, resources):1033 delete_resource(resource_id, resources, stack_name)1034 self.stack.set_resource_status(resource_id, "DELETE_COMPLETE")1035 # update status1036 self.stack.set_stack_status("DELETE_COMPLETE")1037 # ----------------------------1038 # DEPENDENCY RESOLUTION UTILS1039 # ----------------------------1040 def is_deployable_resource(self, resource):1041 resource_type = get_resource_type(resource)1042 entry = get_deployment_config(resource_type)1043 if entry is None and resource_type not in ["Parameter", None]:1044 # fall back to moto resource creation (TODO: remove in the future)1045 long_res_type = canonical_resource_type(resource_type)1046 if long_res_type in parsing.MODEL_MAP:1047 return True1048 LOG.warning('Unable to deploy resource type "%s": %s' % (resource_type, resource))1049 return bool(entry and entry.get(ACTION_CREATE))1050 def is_deployed(self, resource):1051 resource_status = {}1052 resource_id = resource["LogicalResourceId"]1053 details = retrieve_resource_details(1054 resource_id, resource_status, self.resources, self.stack_name1055 )1056 return bool(details)1057 def is_updateable(self, resource):1058 """Return whether the given resource can be updated or not."""1059 if not self.is_deployable_resource(resource) or not self.is_deployed(resource):1060 return False1061 resource_type = get_resource_type(resource)1062 return resource_type in UPDATEABLE_RESOURCES1063 def all_resource_dependencies_satisfied(self, resource):1064 unsatisfied = self.get_unsatisfied_dependencies(resource)1065 return not unsatisfied1066 def get_unsatisfied_dependencies(self, resource):1067 res_deps = self.get_resource_dependencies(resource)1068 return self.get_unsatisfied_dependencies_for_resources(res_deps, resource)1069 def get_unsatisfied_dependencies_for_resources(1070 self, resources, depending_resource=None, return_first=True1071 ):1072 result = {}1073 for resource_id, resource in iteritems(resources):1074 if self.is_deployable_resource(resource):1075 if not self.is_deployed(resource):1076 LOG.debug(1077 "Dependency for resource %s not yet deployed: %s %s"1078 % (depending_resource, resource_id, resource)1079 )1080 result[resource_id] = resource1081 if return_first:1082 break1083 return result1084 def get_resource_dependencies(self, resource):1085 result = {}1086 # Note: using the original, unmodified template here to preserve Ref's ...1087 raw_resources = self.stack.template_original["Resources"]1088 raw_resource = raw_resources[resource["LogicalResourceId"]]1089 dumped = json.dumps(common.json_safe(raw_resource))1090 for other_id, other in raw_resources.items():1091 if resource != other:1092 # TODO: traverse dict instead of doing string search!1093 search1 = '{"Ref": "%s"}' % other_id1094 search2 = '{"Fn::GetAtt": ["%s", ' % other_id1095 if search1 in dumped or search2 in dumped:1096 result[other_id] = other1097 if other_id in resource.get("DependsOn", []):1098 result[other_id] = other1099 return result1100 # -----------------1101 # DEPLOYMENT UTILS1102 # -----------------1103 def add_default_resource_props(self, resources=None):1104 resources = resources or self.resources1105 for resource_id, resource in resources.items():1106 add_default_resource_props(1107 resource, self.stack_name, resource_id=resource_id, existing_resources=resources1108 )1109 def init_resource_status(self, resources=None, stack=None, action="CREATE"):1110 resources = resources or self.resources1111 stack = stack or self.stack1112 for resource_id, resource in resources.items():1113 stack.set_resource_status(resource_id, "%s_IN_PROGRESS" % action)1114 def update_resource_details(self, resource_id, result, stack=None, action="CREATE"):1115 stack = stack or self.stack1116 # update resource state1117 update_resource_details(stack, resource_id, result, action)1118 # update physical resource id1119 resource = stack.resources[resource_id]1120 physical_id = resource.get("PhysicalResourceId")1121 physical_id = physical_id or determine_resource_physical_id(resource_id, stack=stack)1122 if not resource.get("PhysicalResourceId") or action == "UPDATE":1123 if physical_id:1124 resource["PhysicalResourceId"] = physical_id1125 # set resource status1126 stack.set_resource_status(resource_id, "%s_COMPLETE" % action, physical_res_id=physical_id)1127 return physical_id1128 def get_change_config(self, action, resource, change_set_id=None):1129 return {1130 "Type": "Resource",1131 "ResourceChange": {1132 "Action": action,1133 "LogicalResourceId": resource.get("LogicalResourceId"),1134 "PhysicalResourceId": resource.get("PhysicalResourceId"),1135 "ResourceType": resource.get("Type"),1136 "Replacement": "False",1137 "ChangeSetId": change_set_id,1138 },1139 }1140 def resource_config_differs(self, resource_new):1141 """Return whether the given resource properties differ from the existing config (for stack updates)."""1142 resource_id = resource_new["LogicalResourceId"]1143 resource_old = self.resources[resource_id]1144 props_old = resource_old["Properties"]1145 props_new = resource_new["Properties"]1146 ignored_keys = ["LogicalResourceId", "PhysicalResourceId"]1147 old_keys = set(props_old.keys()) - set(ignored_keys)1148 new_keys = set(props_new.keys()) - set(ignored_keys)1149 if old_keys != new_keys:1150 return True1151 for key in old_keys:1152 if props_old[key] != props_new[key]:1153 return True1154 old_status = self.stack.resource_states.get(resource_id) or {}1155 previous_state = (1156 old_status.get("PreviousResourceStatus") or old_status.get("ResourceStatus") or ""1157 )1158 if old_status and "DELETE" in previous_state:1159 return True1160 def merge_properties(self, resource_id, old_stack, new_stack):1161 old_resources = old_stack.template["Resources"]1162 new_resources = new_stack.template["Resources"]1163 new_resource = new_resources[resource_id]1164 old_resource = old_resources[resource_id] = old_resources.get(resource_id) or {}1165 for key, value in new_resource.items():1166 if key == "Properties":1167 continue1168 old_resource[key] = old_resource.get(key, value)1169 old_res_props = old_resource["Properties"] = old_resource.get("Properties", {})1170 for key, value in new_resource["Properties"].items():1171 old_res_props[key] = value1172 # overwrite original template entirely1173 old_stack.template_original["Resources"][resource_id] = new_stack.template_original[1174 "Resources"1175 ][resource_id]1176 def resolve_param(1177 self, logical_id: str, param_type: str, default_value: Optional[str] = None1178 ) -> Optional[str]:1179 if param_type == "AWS::SSM::Parameter::Value<String>":1180 ssm_client = aws_stack.connect_to_service("ssm")1181 param = ssm_client.get_parameter(Name=default_value)1182 return param["Parameter"]["Value"]1183 return None1184 def apply_parameter_changes(self, old_stack, new_stack) -> None:1185 parameters = {1186 p["ParameterKey"]: p1187 for p in old_stack.metadata["Parameters"] # go through current parameter values1188 }1189 for logical_id, value in new_stack.template["Parameters"].items():1190 default = value.get("Default")1191 provided_param_value = parameters.get(logical_id)1192 param = {1193 "ParameterKey": logical_id,1194 "ParameterValue": provided_param_value if default is None else default,1195 }1196 if default is not None:1197 resolved_value = self.resolve_param(logical_id, value.get("Type"), default)1198 if resolved_value is not None:1199 param["ResolvedValue"] = resolved_value1200 parameters[logical_id] = param1201 parameters.update({p["ParameterKey"]: p for p in new_stack.metadata["Parameters"]})1202 for change_set in new_stack.change_sets:1203 parameters.update({p["ParameterKey"]: p for p in change_set.metadata["Parameters"]})1204 # TODO: unclear/undocumented behavior in implicitly updating old_stack parameter here1205 old_stack.metadata["Parameters"] = [v for v in parameters.values() if v]1206 # TODO: fix circular import with cloudformation_api.py when importing Stack here1207 def construct_changes(1208 self,1209 existing_stack,1210 new_stack,1211 initialize=False,1212 change_set_id=None,1213 append_to_changeset=False,1214 ):1215 from localstack.services.cloudformation.cloudformation_api import StackChangeSet1216 old_resources = existing_stack.template["Resources"]1217 new_resources = new_stack.template["Resources"]1218 deletes = [val for key, val in old_resources.items() if key not in new_resources]1219 adds = [val for key, val in new_resources.items() if initialize or key not in old_resources]1220 modifies = [val for key, val in new_resources.items() if key in old_resources]1221 changes = []1222 for action, items in (("Remove", deletes), ("Add", adds), ("Modify", modifies)):1223 for item in items:1224 item["Properties"] = item.get("Properties", {})1225 change = self.get_change_config(action, item, change_set_id=change_set_id)1226 changes.append(change)1227 # append changes to change set1228 if append_to_changeset and isinstance(new_stack, StackChangeSet):1229 new_stack.changes.extend(changes)1230 return changes1231 def apply_changes(1232 self,1233 existing_stack,1234 new_stack,1235 stack_name,1236 change_set_id=None,1237 initialize=False,1238 action=None,1239 ):1240 old_resources = existing_stack.template["Resources"]1241 new_resources = new_stack.template["Resources"]1242 action = action or "CREATE"1243 self.init_resource_status(old_resources, action="UPDATE")1244 # apply parameter changes to existing stack1245 self.apply_parameter_changes(existing_stack, new_stack)1246 # construct changes1247 changes = self.construct_changes(1248 existing_stack,1249 new_stack,1250 initialize=initialize,1251 change_set_id=change_set_id,1252 )1253 # check if we have actual changes in the stack, and prepare properties1254 contains_changes = False1255 for change in changes:1256 res_action = change["ResourceChange"]["Action"]1257 resource = new_resources.get(change["ResourceChange"]["LogicalResourceId"])1258 if res_action != "Modify" or self.resource_config_differs(resource):1259 contains_changes = True1260 if res_action in ["Modify", "Add"]:1261 self.merge_properties(resource["LogicalResourceId"], existing_stack, new_stack)1262 if not contains_changes:1263 raise NoStackUpdates("No updates are to be performed.")1264 # merge stack outputs1265 existing_stack.template["Outputs"].update(new_stack.template.get("Outputs", {}))1266 # start deployment loop1267 return self.apply_changes_in_loop(1268 changes, existing_stack, stack_name, action=action, new_stack=new_stack1269 )1270 def apply_changes_in_loop(self, changes, stack, stack_name, action=None, new_stack=None):1271 from localstack.services.cloudformation.cloudformation_api import StackChangeSet1272 def _run(*args):1273 try:1274 self.do_apply_changes_in_loop(changes, stack, stack_name)1275 status = "%s_COMPLETE" % action1276 except Exception as e:1277 LOG.debug(1278 'Error applying changes for CloudFormation stack "%s": %s %s'1279 % (stack.stack_name, e, traceback.format_exc())1280 )1281 status = "%s_FAILED" % action1282 stack.set_stack_status(status)1283 if isinstance(new_stack, StackChangeSet):1284 new_stack.metadata["Status"] = status1285 new_stack.metadata["ExecutionStatus"] = (1286 "EXECUTE_FAILED" if "FAILED" in status else "EXECUTE_COMPLETE"1287 )1288 new_stack.metadata["StatusReason"] = "Deployment %s" % (1289 "failed" if "FAILED" in status else "succeeded"1290 )1291 # run deployment in background loop, to avoid client network timeouts1292 return start_worker_thread(_run)1293 def do_apply_changes_in_loop(self, changes, stack, stack_name: str):1294 # apply changes in a retry loop, to resolve resource dependencies and converge to the target state1295 changes_done = []1296 max_iters = 301297 new_resources = stack.resources1298 # apply default props before running the loop1299 for resource_id, resource in new_resources.items():1300 add_default_resource_props(1301 resource,1302 stack.stack_name,1303 resource_id=resource_id,1304 existing_resources=new_resources,1305 )1306 # start deployment loop1307 for i in range(max_iters):1308 j = 01309 updated = False1310 while j < len(changes):1311 change = changes[j]1312 res_change = change["ResourceChange"]1313 action = res_change["Action"]1314 is_add_or_modify = action in ["Add", "Modify"]1315 resource_id = res_change["LogicalResourceId"]1316 try:1317 if is_add_or_modify:1318 resource = new_resources[resource_id]1319 should_deploy = self.prepare_should_deploy_change(1320 resource_id, change, stack, new_resources1321 )1322 LOG.debug(1323 'Handling "%s" for resource "%s" (%s/%s) type "%s" in loop iteration %s (should_deploy=%s)'1324 % (1325 action,1326 resource_id,1327 j + 1,1328 len(changes),1329 res_change["ResourceType"],1330 i + 1,1331 should_deploy,1332 )1333 )1334 if not should_deploy:1335 del changes[j]1336 stack_action = get_action_name_for_resource_change(action)1337 stack.set_resource_status(resource_id, "%s_COMPLETE" % stack_action)1338 continue1339 if not self.all_resource_dependencies_satisfied(resource):1340 j += 11341 continue1342 self.apply_change(change, stack, new_resources, stack_name=stack_name)1343 changes_done.append(change)1344 del changes[j]1345 updated = True1346 except DependencyNotYetSatisfied as e:1347 LOG.debug(1348 'Dependencies for "%s" not yet satisfied, retrying in next loop: %s'1349 % (resource_id, e)1350 )1351 j += 11352 if not changes:1353 break1354 if not updated:1355 raise Exception(1356 "Resource deployment loop completed, pending resource changes: %s" % changes1357 )1358 # clean up references to deleted resources in stack1359 deletes = [c for c in changes_done if c["ResourceChange"]["Action"] == "Remove"]1360 for delete in deletes:1361 stack.template["Resources"].pop(delete["ResourceChange"]["LogicalResourceId"], None)1362 return changes_done1363 def prepare_should_deploy_change(self, resource_id, change, stack, new_resources):1364 resource = new_resources[resource_id]1365 res_change = change["ResourceChange"]1366 action = res_change["Action"]1367 # check resource condition, if present1368 if not evaluate_resource_condition(resource, stack.stack_name, new_resources):1369 LOG.debug(1370 'Skipping deployment of "%s", as resource condition evaluates to false'1371 % resource_id1372 )1373 return1374 # resolve refs in resource details1375 resolve_refs_recursively(stack.stack_name, resource, new_resources)1376 if action in ["Add", "Modify"]:1377 is_deployed = self.is_deployed(resource)1378 if action == "Modify" and not is_deployed:1379 action = res_change["Action"] = "Add"1380 if action == "Add":1381 if not self.is_deployable_resource(resource) or is_deployed:1382 return False1383 if action == "Modify" and not self.is_updateable(resource):1384 LOG.debug(1385 'Action "update" not yet implemented for CF resource type %s'1386 % resource.get("Type")1387 )1388 return False1389 return True1390 def apply_change(self, change, old_stack, new_resources, stack_name):1391 change_details = change["ResourceChange"]1392 action = change_details["Action"]1393 resource_id = change_details["LogicalResourceId"]1394 resource = new_resources[resource_id]1395 if not evaluate_resource_condition(resource, stack_name, new_resources):1396 return1397 # execute resource action1398 result = None1399 if action == "Add":1400 result = deploy_resource(resource_id, new_resources, stack_name)1401 elif action == "Remove":1402 result = delete_resource(resource_id, old_stack.resources, stack_name)1403 elif action == "Modify":1404 result = update_resource(resource_id, new_resources, stack_name)1405 # update resource status and physical resource id1406 stack_action = get_action_name_for_resource_change(action)1407 self.update_resource_details(resource_id, result, stack=old_stack, action=stack_action)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!