Best Python code snippet using localstack_python
provider.py
Source:provider.py
...81 },82 "UpdateDate": "2019-05-20T18:22:18+00:00",83 }84}85def get_iam_backend(context: RequestContext) -> IAMBackend:86 return iam_backends[context.account_id]["global"]87class IamProvider(IamApi):88 def __init__(self):89 apply_patches()90 @handler("CreateRole", expand=False)91 def create_role(92 self, context: RequestContext, request: CreateRoleRequest93 ) -> CreateRoleResponse:94 result = call_moto(context)95 if not request.get("MaxSessionDuration") and result["Role"].get("MaxSessionDuration"):96 backend = get_iam_backend(context)97 role = backend.get_role(request["RoleName"])98 role.max_session_duration = None99 result["Role"].pop("MaxSessionDuration")100 if "RoleLastUsed" in result["Role"] and not result["Role"]["RoleLastUsed"]:101 # not part of the AWS response if it's empty102 # FIXME: RoleLastUsed did not seem well supported when this check was added103 result["Role"].pop("RoleLastUsed")104 return result105 @staticmethod106 def build_evaluation_result(107 action_name: ActionNameType, resource_name: ResourceNameType, policy_statements: List[Dict]108 ) -> EvaluationResult:109 eval_res = EvaluationResult()110 eval_res["EvalActionName"] = action_name111 eval_res["EvalResourceName"] = resource_name112 eval_res["EvalDecision"] = PolicyEvaluationDecisionType.explicitDeny113 for statement in policy_statements:114 # TODO Implement evaluation logic here115 if (116 action_name in statement["Action"]117 and resource_name in statement["Resource"]118 and statement["Effect"] == "Allow"119 ):120 eval_res["EvalDecision"] = PolicyEvaluationDecisionType.allowed121 eval_res["MatchedStatements"] = [] # TODO: add support for statement compilation.122 return eval_res123 def simulate_principal_policy(124 self,125 context: RequestContext,126 policy_source_arn: arnType,127 action_names: ActionNameListType,128 policy_input_list: SimulationPolicyListType = None,129 permissions_boundary_policy_input_list: SimulationPolicyListType = None,130 resource_arns: ResourceNameListType = None,131 resource_policy: policyDocumentType = None,132 resource_owner: ResourceNameType = None,133 caller_arn: ResourceNameType = None,134 context_entries: ContextEntryListType = None,135 resource_handling_option: ResourceHandlingOptionType = None,136 max_items: maxItemsType = None,137 marker: markerType = None,138 ) -> SimulatePolicyResponse:139 backend = get_iam_backend(context)140 policy = backend.get_policy(policy_source_arn)141 policy_version = backend.get_policy_version(policy_source_arn, policy.default_version_id)142 try:143 policy_statements = json.loads(policy_version.document).get("Statement", [])144 except Exception:145 raise NoSuchEntityException("Policy not found")146 evaluations = [147 self.build_evaluation_result(action_name, resource_arn, policy_statements)148 for action_name in action_names149 for resource_arn in resource_arns150 ]151 response = SimulatePolicyResponse()152 response["IsTruncated"] = False153 response["EvaluationResults"] = evaluations154 return response155 def delete_policy(self, context: RequestContext, policy_arn: arnType) -> None:156 backend = get_iam_backend(context)157 if backend.managed_policies.get(policy_arn):158 backend.managed_policies.pop(policy_arn, None)159 else:160 raise NoSuchEntityException("Policy {0} was not found.".format(policy_arn))161 def detach_role_policy(162 self, context: RequestContext, role_name: roleNameType, policy_arn: arnType163 ) -> None:164 backend = get_iam_backend(context)165 try:166 role = backend.get_role(role_name)167 policy = role.managed_policies[policy_arn]168 policy.detach_from(role)169 except KeyError:170 raise NoSuchEntityException("Policy {0} was not found.".format(policy_arn))171 @staticmethod172 def moto_role_to_role_type(moto_role: MotoRole) -> Role:173 role = Role()174 role["Path"] = moto_role.path175 role["RoleName"] = moto_role.name176 role["RoleId"] = moto_role.id177 role["Arn"] = moto_role.arn178 role["CreateDate"] = moto_role.create_date179 if moto_role.assume_role_policy_document:180 role["AssumeRolePolicyDocument"] = moto_role.assume_role_policy_document181 if moto_role.description:182 role["Description"] = moto_role.description183 if moto_role.max_session_duration:184 role["MaxSessionDuration"] = moto_role.max_session_duration185 if moto_role.permissions_boundary:186 role["PermissionsBoundary"] = moto_role.permissions_boundary187 if moto_role.tags:188 role["Tags"] = [Tag(Key=k, Value=v) for k, v in moto_role.tags.items()]189 # role["RoleLastUsed"]: # TODO: add support190 return role191 def list_roles(192 self,193 context: RequestContext,194 path_prefix: pathPrefixType = None,195 marker: markerType = None,196 max_items: maxItemsType = None,197 ) -> ListRolesResponse:198 backend = get_iam_backend(context)199 moto_roles = backend.roles.values()200 if path_prefix:201 moto_roles = filter_items_with_path_prefix(path_prefix, moto_roles)202 moto_roles = sorted(moto_roles, key=lambda role: role.id)203 response_roles = []204 for moto_role in moto_roles:205 response_role = self.moto_role_to_role_type(moto_role)206 response_roles.append(response_role)207 if (208 path_prefix209 ): # TODO: this is consistent with the patch it migrates, but should add tests for this.210 response_role["AssumeRolePolicyDocument"] = quote(211 json.dumps(moto_role.assume_role_policy_document or {})212 )213 return ListRolesResponse(Roles=response_roles, IsTruncated=False)214 def update_group(215 self,216 context: RequestContext,217 group_name: groupNameType,218 new_path: pathType = None,219 new_group_name: groupNameType = None,220 ) -> None:221 new_group_name = new_group_name or group_name222 backend = get_iam_backend(context)223 group = backend.get_group(group_name)224 group.path = new_path225 group.name = new_group_name226 backend.groups[new_group_name] = backend.groups.pop(group_name)227 def list_instance_profile_tags(228 self,229 context: RequestContext,230 instance_profile_name: instanceProfileNameType,231 marker: markerType = None,232 max_items: maxItemsType = None,233 ) -> ListInstanceProfileTagsResponse:234 backend = get_iam_backend(context)235 profile = backend.get_instance_profile(instance_profile_name)236 response = ListInstanceProfileTagsResponse()237 response["Tags"] = [Tag(Key=k, Value=v) for k, v in profile.tags.items()]238 return response239 def tag_instance_profile(240 self,241 context: RequestContext,242 instance_profile_name: instanceProfileNameType,243 tags: tagListType,244 ) -> None:245 backend = get_iam_backend(context)246 profile = backend.get_instance_profile(instance_profile_name)247 value_by_key = {tag["Key"]: tag["Value"] for tag in tags}248 profile.tags.update(value_by_key)249 def untag_instance_profile(250 self,251 context: RequestContext,252 instance_profile_name: instanceProfileNameType,253 tag_keys: tagKeyListType,254 ) -> None:255 backend = get_iam_backend(context)256 profile = backend.get_instance_profile(instance_profile_name)257 for tag in tag_keys:258 profile.tags.pop(tag, None)259 def create_service_linked_role(260 self,261 context: RequestContext,262 aws_service_name: groupNameType,263 description: roleDescriptionType = None,264 custom_suffix: customSuffixType = None,265 ) -> CreateServiceLinkedRoleResponse:266 # TODO: test267 # TODO: how to support "CustomSuffix" API request parameter?268 policy_doc = json.dumps(269 {270 "Version": "2012-10-17",271 "Statement": [272 {273 "Effect": "Allow",274 "Principal": {"Service": aws_service_name},275 "Action": "sts:AssumeRole",276 }277 ],278 }279 )280 path = f"{SERVICE_LINKED_ROLE_PATH_PREFIX}/{aws_service_name}"281 role_name = f"r-{short_uid()}"282 backend = get_iam_backend(context)283 role = backend.create_role(284 role_name=role_name,285 assume_role_policy_document=policy_doc,286 path=path,287 permissions_boundary="",288 description=description,289 tags={},290 max_session_duration=3600,291 )292 role.service_linked_role_arn = "arn:aws:iam::{0}:role/aws-service-role/{1}/{2}".format(293 context.account_id, aws_service_name, role.name294 )295 res_role = self.moto_role_to_role_type(role)296 return CreateServiceLinkedRoleResponse(Role=res_role)297 def delete_service_linked_role(298 self, context: RequestContext, role_name: roleNameType299 ) -> DeleteServiceLinkedRoleResponse:300 # TODO: test301 backend = get_iam_backend(context)302 backend.delete_role(role_name)303 return DeleteServiceLinkedRoleResponse(DeletionTaskId=short_uid())304 def get_service_linked_role_deletion_status(305 self, context: RequestContext, deletion_task_id: DeletionTaskIdType306 ) -> GetServiceLinkedRoleDeletionStatusResponse:307 # TODO: test308 return GetServiceLinkedRoleDeletionStatusResponse(Status=DeletionTaskStatusType.SUCCEEDED)309 def put_user_permissions_boundary(310 self, context: RequestContext, user_name: userNameType, permissions_boundary: arnType311 ) -> None:312 if user := get_iam_backend(context).users.get(user_name):313 user.permissions_boundary = permissions_boundary314 else:315 raise NoSuchEntityException()316 def delete_user_permissions_boundary(317 self, context: RequestContext, user_name: userNameType318 ) -> None:319 if user := get_iam_backend(context).users.get(user_name):320 if hasattr(user, "permissions_boundary"):321 delattr(user, "permissions_boundary")322 else:323 raise NoSuchEntityException()324 def create_user(325 self,326 context: RequestContext,327 user_name: userNameType,328 path: pathType = None,329 permissions_boundary: arnType = None,330 tags: tagListType = None,331 ) -> CreateUserResponse:332 response = call_moto(context=context)333 user = get_iam_backend(context).get_user(user_name)334 if permissions_boundary:335 user.permissions_boundary = permissions_boundary336 response["User"]["PermissionsBoundary"] = AttachedPermissionsBoundary(337 PermissionsBoundaryArn=permissions_boundary,338 PermissionsBoundaryType="Policy",339 )340 return response341 def get_user(342 self, context: RequestContext, user_name: existingUserNameType = None343 ) -> GetUserResponse:344 response = call_moto(context=context)345 moto_user_name = response["User"]["UserName"]346 moto_user = get_iam_backend(context).users.get(moto_user_name)347 # if the user does not exist or is no user348 if not moto_user and not user_name:349 access_key_id = extract_access_key_id_from_auth_header(context.request.headers)350 sts_client = aws_stack.connect_to_service(351 "sts",352 aws_access_key_id=access_key_id,353 aws_secret_access_key=INTERNAL_AWS_SECRET_ACCESS_KEY,354 )355 caller_identity = sts_client.get_caller_identity()356 caller_arn = caller_identity["Arn"]357 if caller_arn.endswith(":root"):358 return GetUserResponse(359 User=User(360 UserId=context.account_id,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!