How to use get_kms_backend method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...118 def __init__(self, message=None):119 super().__init__("ValidationError", message=message)120class KmsProvider(KmsApi):121 @staticmethod122 def get_kms_backend(context: RequestContext) -> BaseBackend:123 return kms_backends[context.account_id][context.region]124 @handler("CreateKey", expand=False)125 def create_key(126 self,127 context: RequestContext,128 create_key_request: CreateKeyRequest = None,129 ) -> CreateKeyResponse:130 result = call_moto(context)131 # generate keypair for signing, if this is a SIGN_VERIFY key132 key_usage = create_key_request.get("KeyUsage", "ENCRYPT_DECRYPT")133 key_spec = create_key_request.get("KeySpec", "SYMMETRIC_DEFAULT")134 if not (key_usage == "ENCRYPT_DECRYPT" and key_spec == "SYMMETRIC_DEFAULT"):135 create_key_request["KeyId"] = result["KeyMetadata"]["KeyId"]136 _generate_data_key_pair(create_key_request, create_cipher=False)137 return result138 @handler("CreateGrant", expand=False)139 def create_grant(140 self, context: RequestContext, create_grant_request: CreateGrantRequest141 ) -> CreateGrantResponse:142 self._validate_grant(create_grant_request)143 region_details = KMSBackend.get()144 grant = dict(create_grant_request)145 grant[GRANT_ID] = long_uid()146 grant[GRANT_TOKENS] = [long_uid()]147 if NAME not in grant:148 grant[NAME] = ""149 grant[CREATION_DATE] = time.time()150 region_details.grants[grant[GRANT_ID]] = grant151 return CreateGrantResponse(GrantId=grant[GRANT_ID], GrantToken=grant[GRANT_TOKENS][0])152 @handler("ListGrants", expand=False)153 def list_grants(154 self, context: RequestContext, list_grants_request: ListGrantsRequest155 ) -> ListGrantsResponse:156 key_id = list_grants_request.get(KEY_ID)157 if not key_id:158 raise ValidationError(f"Required input parameter '{KEY_ID}' not specified")159 region_details = KMSBackend.get()160 self._verify_key_exists(key_id)161 limit = list_grants_request.get("Limit", 50)162 if "Marker" in list_grants_request:163 filtered = region_details.markers.get(list_grants_request["Marker"], [])164 else:165 filtered = [166 grant167 for grant in region_details.grants.values()168 if grant[KEY_ID] == key_id169 and filter_grant_id(grant, list_grants_request)170 and filter_grantee_principal(grant, list_grants_request)171 ]172 # filter out attributes173 filtered = [remove_attributes(dict(grant), ["GrantTokens"]) for grant in filtered]174 if len(filtered) <= limit:175 return ListGrantsResponse(Grants=filtered, Truncated=False)176 in_limit = filtered[:limit]177 out_limit = filtered[limit:]178 marker_id = long_uid()179 region_details.markers[marker_id] = out_limit180 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)181 def revoke_grant(182 self, context: RequestContext, key_id: KeyIdType, grant_id: GrantIdType183 ) -> None:184 grants = KMSBackend.get().grants185 if grants[grant_id][KEY_ID] != key_id:186 raise ValidationError(f"Invalid {KEY_ID}={key_id} specified for grant {grant_id}")187 grants.pop(grant_id)188 def retire_grant(189 self,190 context: RequestContext,191 grant_token: GrantTokenType = None,192 key_id: KeyIdType = None,193 grant_id: GrantIdType = None,194 ) -> None:195 region_details = KMSBackend.get()196 grants = region_details.grants197 if grant_id and grants[grant_id][KEY_ID] == key_id:198 grants.pop(grant_id)199 elif grant_token:200 region_details.grants = {201 grant_id: grant202 for grant_id, grant in grants.items()203 if grant_token not in grant[GRANT_TOKENS]204 }205 else:206 raise InvalidGrantTokenException("Grant token OR (grant ID, key ID) must be specified")207 def list_retirable_grants(208 self,209 context: RequestContext,210 retiring_principal: PrincipalIdType,211 limit: LimitType = None,212 marker: MarkerType = None,213 ) -> ListGrantsResponse:214 region_details = KMSBackend.get()215 grants = region_details.grants216 if not retiring_principal:217 raise ValidationError(f"Required input parameter '{RETIRING_PRINCIPAL}' not specified")218 limit = limit or 50219 if marker:220 markers = region_details.markers221 filtered = markers.get(marker, [])222 else:223 filtered = [224 grant225 for grant in grants.values()226 if RETIRING_PRINCIPAL in grant and grant[RETIRING_PRINCIPAL] == retiring_principal227 ]228 if len(filtered) <= limit:229 return ListGrantsResponse(Grants=filtered, Truncated=False)230 markers = region_details.markers231 in_limit = filtered[:limit]232 out_limit = filtered[limit:]233 marker_id = long_uid()234 markers[marker_id] = out_limit235 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)236 @handler("GetPublicKey")237 def get_public_key(238 self, context: RequestContext, key_id: KeyIdType, grant_tokens: GrantTokenList = None239 ) -> GetPublicKeyResponse:240 region_details = KMSBackend.get()241 result = region_details.key_pairs.get(key_id)242 if not result:243 raise NotFoundException()244 attrs = [245 "KeyId",246 "PublicKey",247 "KeySpec",248 "KeyUsage",249 "EncryptionAlgorithms",250 "SigningAlgorithms",251 ]252 result = select_attributes(result, attrs)253 return GetPublicKeyResponse(**result)254 @handler("GenerateDataKeyPair", expand=False)255 def generate_data_key_pair(256 self,257 context: RequestContext,258 generate_data_key_pair_request: GenerateDataKeyPairRequest,259 ) -> GenerateDataKeyPairResponse:260 result = _generate_data_key_pair(generate_data_key_pair_request)261 attrs = [262 "PrivateKeyCiphertextBlob",263 "PrivateKeyPlaintext",264 "PublicKey",265 "KeyId",266 "KeyPairSpec",267 ]268 result = select_attributes(result, attrs)269 return GenerateDataKeyPairResponse(**result)270 @handler("GenerateDataKeyPairWithoutPlaintext", expand=False)271 def generate_data_key_pair_without_plaintext(272 self,273 context: RequestContext,274 generate_data_key_pair_without_plaintext_request: GenerateDataKeyPairWithoutPlaintextRequest,275 ) -> GenerateDataKeyPairWithoutPlaintextResponse:276 result = _generate_data_key_pair(generate_data_key_pair_without_plaintext_request)277 result = select_attributes(278 result, ["PrivateKeyCiphertextBlob", "PublicKey", "KeyId", "KeyPairSpec"]279 )280 return GenerateDataKeyPairResponse(**result)281 def sign(282 self,283 context: RequestContext,284 key_id: KeyIdType,285 message: PlaintextType,286 signing_algorithm: SigningAlgorithmSpec,287 message_type: MessageType = None,288 grant_tokens: GrantTokenList = None,289 ) -> SignResponse:290 region_details = KMSBackend.get()291 key_pair = region_details.key_pairs.get(key_id)292 if not key_pair:293 raise NotFoundException(f"Key ID {key_id} not found for signing")294 kwargs = {}295 if signing_algorithm.startswith("RSA"):296 if "PKCS" in signing_algorithm:297 kwargs["padding"] = padding.PKCS1v15()298 elif "PSS" in signing_algorithm:299 kwargs["padding"] = padding.PSS(300 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH301 )302 else:303 LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)304 if "SHA_256" in signing_algorithm:305 kwargs["algorithm"] = hashes.SHA256()306 elif "SHA_384" in signing_algorithm:307 kwargs["algorithm"] = hashes.SHA384()308 elif "SHA_512" in signing_algorithm:309 kwargs["algorithm"] = hashes.SHA512()310 else:311 LOG.warning("Unsupported hash type in SigningAlgorithm '%s'", signing_algorithm)312 if signing_algorithm.startswith("ECDSA"):313 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))314 # generate signature315 signature = key_pair["_key_"].sign(data=message, **kwargs)316 result = {317 "KeyId": key_id,318 "Signature": signature,319 "SigningAlgorithm": signing_algorithm,320 }321 return SignResponse(**result)322 def encrypt(323 self,324 context: RequestContext,325 key_id: KeyIdType,326 plaintext: PlaintextType,327 encryption_context: EncryptionContextType = None,328 grant_tokens: GrantTokenList = None,329 encryption_algorithm: EncryptionAlgorithmSpec = None,330 ) -> EncryptResponse:331 # check if we have imported custom key material for this key332 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]333 if not matching:334 return call_moto(context)335 key_obj = self.get_kms_backend(context).keys.get(key_id)336 ciphertext_blob = encrypt(key_obj.key_material, plaintext)337 return EncryptResponse(338 CiphertextBlob=ciphertext_blob, KeyId=key_id, EncryptionAlgorithm=encryption_algorithm339 )340 def decrypt(341 self,342 context: RequestContext,343 ciphertext_blob: CiphertextType,344 encryption_context: EncryptionContextType = None,345 grant_tokens: GrantTokenList = None,346 key_id: KeyIdType = None,347 encryption_algorithm: EncryptionAlgorithmSpec = None,348 ) -> DecryptResponse:349 # check if we have imported custom key material for this key350 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]351 if not matching:352 return call_moto(context)353 key_obj = self.get_kms_backend(context).keys.get(key_id)354 plaintext = decrypt(key_obj.key_material, ciphertext_blob)355 return DecryptResponse(356 KeyId=key_id, Plaintext=plaintext, EncryptionAlgorithm=encryption_algorithm357 )358 def get_parameters_for_import(359 self,360 context: RequestContext,361 key_id: KeyIdType,362 wrapping_algorithm: AlgorithmSpec,363 wrapping_key_spec: WrappingKeySpec,364 ) -> GetParametersForImportResponse:365 key = _generate_data_key_pair(366 {"KeySpec": wrapping_key_spec}, create_cipher=False, add_to_keys=False367 )368 import_token = short_uid()369 import_state = KeyImportState(370 key_id=key_id,371 import_token=import_token,372 private_key=key["PrivateKeyPlaintext"],373 public_key=key["PublicKey"],374 wrapping_algo=wrapping_algorithm,375 key_obj=key["_key_"],376 )377 KMSBackend.get().imports[import_token] = import_state378 expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)379 return GetParametersForImportResponse(380 KeyId=key_id,381 ImportToken=to_bytes(import_state.import_token),382 PublicKey=import_state.public_key,383 ParametersValidTo=expiry_date,384 )385 def import_key_material(386 self,387 context: RequestContext,388 key_id: KeyIdType,389 import_token: CiphertextType,390 encrypted_key_material: CiphertextType,391 valid_to: DateType = None,392 expiration_model: ExpirationModelType = None,393 ) -> ImportKeyMaterialResponse:394 import_token = to_str(import_token)395 import_state = KMSBackend.get().imports.get(import_token)396 if not import_state:397 raise NotFoundException(f"Unable to find key import token '{import_token}'")398 key_obj = self.get_kms_backend(context).keys.get(key_id)399 if not key_obj:400 raise NotFoundException(f"Unable to find key '{key_id}'")401 if import_state.wrapping_algo == AlgorithmSpec.RSAES_PKCS1_V1_5:402 decrypt_padding = padding.PKCS1v15()403 elif import_state.wrapping_algo == AlgorithmSpec.RSAES_OAEP_SHA_1:404 decrypt_padding = padding.OAEP(padding.MGF1(hashes.SHA1()), hashes.SHA1(), None)405 elif import_state.wrapping_algo == AlgorithmSpec.RSAES_OAEP_SHA_256:406 decrypt_padding = padding.OAEP(padding.MGF1(hashes.SHA256()), hashes.SHA256(), None)407 else:408 raise KMSInvalidStateException(409 f"Unsupported padding, requested wrapping algorithm:'{import_state.wrapping_algo}'"410 )411 key_material = import_state.key_obj.decrypt(encrypted_key_material, decrypt_padding)412 key_obj.key_material = key_material413 return ImportKeyMaterialResponse()414 def list_aliases(415 self,416 context: RequestContext,417 key_id: KeyIdType = None,418 limit: LimitType = None,419 marker: MarkerType = None,420 ) -> ListAliasesResponse:421 if key_id is None:422 return call_moto(context)423 response_aliases = PaginatedList()424 backend = self.get_kms_backend(context)425 if backend.keys.get(key_id) is None:426 raise NotFoundException(f"Unable to find key '{key_id}'")427 aliases_of_key = backend.get_all_aliases().get(key_id) or []428 for alias_name in aliases_of_key:429 response_aliases.append(430 AliasListEntry(431 AliasArn=kms_alias_arn(alias_name, region_name=context.region),432 AliasName=alias_name,433 TargetKeyId=key_id,434 )435 )436 page, nxt = response_aliases.get_page(437 lambda a: a["AliasName"], next_token=marker, page_size=limit438 )...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful