How to use _generate_data_key_pair method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...125 # generate keypair for signing, if this is a SIGN_VERIFY key126 key_usage = create_key_request.get("KeyUsage")127 if key_usage == "SIGN_VERIFY":128 create_key_request["KeyId"] = result["KeyMetadata"]["KeyId"]129 _generate_data_key_pair(create_key_request, create_cipher=False)130 return result131 @handler("CreateGrant", expand=False)132 def create_grant(133 self, context: RequestContext, create_grant_request: CreateGrantRequest134 ) -> CreateGrantResponse:135 self._validate_grant(create_grant_request)136 region_details = KMSBackend.get()137 grant = dict(create_grant_request)138 grant[GRANT_ID] = long_uid()139 grant[GRANT_TOKENS] = [long_uid()]140 if NAME not in grant:141 grant[NAME] = ""142 grant[CREATION_DATE] = time.time()143 region_details.grants[grant[GRANT_ID]] = grant144 return CreateGrantResponse(GrantId=grant[GRANT_ID], GrantToken=grant[GRANT_TOKENS][0])145 @handler("ListGrants", expand=False)146 def list_grants(147 self, context: RequestContext, list_grants_request: ListGrantsRequest148 ) -> ListGrantsResponse:149 key_id = list_grants_request.get(KEY_ID)150 if not key_id:151 raise ValidationError(f"Required input parameter '{KEY_ID}' not specified")152 region_details = KMSBackend.get()153 self._verify_key_exists(key_id)154 limit = list_grants_request.get("Limit", 50)155 if "Marker" in list_grants_request:156 filtered = region_details.markers.get(list_grants_request["Marker"], [])157 else:158 filtered = [159 grant160 for grant in region_details.grants.values()161 if grant[KEY_ID] == key_id162 and filter_grant_id(grant, list_grants_request)163 and filter_grantee_principal(grant, list_grants_request)164 ]165 # filter out attributes166 filtered = [remove_attributes(dict(grant), ["GrantTokens"]) for grant in filtered]167 if len(filtered) <= limit:168 return ListGrantsResponse(Grants=filtered, Truncated=False)169 in_limit = filtered[:limit]170 out_limit = filtered[limit:]171 marker_id = long_uid()172 region_details.markers[marker_id] = out_limit173 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)174 def revoke_grant(175 self, context: RequestContext, key_id: KeyIdType, grant_id: GrantIdType176 ) -> None:177 grants = KMSBackend.get().grants178 if grants[grant_id][KEY_ID] != key_id:179 raise ValidationError(f"Invalid {KEY_ID}={key_id} specified for grant {grant_id}")180 grants.pop(grant_id)181 def retire_grant(182 self,183 context: RequestContext,184 grant_token: GrantTokenType = None,185 key_id: KeyIdType = None,186 grant_id: GrantIdType = None,187 ) -> None:188 region_details = KMSBackend.get()189 grants = region_details.grants190 if grant_id and grants[grant_id][KEY_ID] == key_id:191 grants.pop(grant_id)192 elif grant_token:193 region_details.grants = {194 grant_id: grant195 for grant_id, grant in grants.items()196 if grant_token not in grant[GRANT_TOKENS]197 }198 else:199 raise InvalidGrantTokenException("Grant token OR (grant ID, key ID) must be specified")200 def list_retirable_grants(201 self,202 context: RequestContext,203 retiring_principal: PrincipalIdType,204 limit: LimitType = None,205 marker: MarkerType = None,206 ) -> ListGrantsResponse:207 region_details = KMSBackend.get()208 grants = region_details.grants209 if not retiring_principal:210 raise ValidationError(f"Required input parameter '{RETIRING_PRINCIPAL}' not specified")211 limit = limit or 50212 if marker:213 markers = region_details.markers214 filtered = markers.get(marker, [])215 else:216 filtered = [217 grant218 for grant in grants.values()219 if RETIRING_PRINCIPAL in grant and grant[RETIRING_PRINCIPAL] == retiring_principal220 ]221 if len(filtered) <= limit:222 return ListGrantsResponse(Grants=filtered, Truncated=False)223 markers = region_details.markers224 in_limit = filtered[:limit]225 out_limit = filtered[limit:]226 marker_id = long_uid()227 markers[marker_id] = out_limit228 return ListGrantsResponse(Grants=in_limit, Truncated=True, NextMarker=marker_id)229 def get_public_key(230 self, context: RequestContext, key_id: KeyIdType, grant_tokens: GrantTokenList = None231 ) -> GetPublicKeyResponse:232 region_details = KMSBackend.get()233 result = region_details.key_pairs.get(key_id)234 if not result:235 raise NotFoundException()236 attrs = [237 "KeyId",238 "PublicKey",239 "KeySpec",240 "KeyUsage",241 "EncryptionAlgorithms",242 "SigningAlgorithms",243 ]244 result = select_attributes(result, attrs)245 return GetPublicKeyResponse(**result)246 @handler("GenerateDataKeyPair", expand=False)247 def generate_data_key_pair(248 self,249 context: RequestContext,250 generate_data_key_pair_request: GenerateDataKeyPairRequest,251 ) -> GenerateDataKeyPairResponse:252 result = _generate_data_key_pair(generate_data_key_pair_request)253 attrs = [254 "PrivateKeyCiphertextBlob",255 "PrivateKeyPlaintext",256 "PublicKey",257 "KeyId",258 "KeyPairSpec",259 ]260 result = select_attributes(result, attrs)261 return GenerateDataKeyPairResponse(**result)262 @handler("GenerateDataKeyPairWithoutPlaintext", expand=False)263 def generate_data_key_pair_without_plaintext(264 self,265 context: RequestContext,266 generate_data_key_pair_without_plaintext_request: GenerateDataKeyPairWithoutPlaintextRequest,267 ) -> GenerateDataKeyPairWithoutPlaintextResponse:268 result = _generate_data_key_pair(generate_data_key_pair_without_plaintext_request)269 result = select_attributes(270 result, ["PrivateKeyCiphertextBlob", "PublicKey", "KeyId", "KeyPairSpec"]271 )272 return GenerateDataKeyPairResponse(**result)273 def sign(274 self,275 context: RequestContext,276 key_id: KeyIdType,277 message: PlaintextType,278 signing_algorithm: SigningAlgorithmSpec,279 message_type: MessageType = None,280 grant_tokens: GrantTokenList = None,281 ) -> SignResponse:282 region_details = KMSBackend.get()283 key_pair = region_details.key_pairs.get(key_id)284 if not key_pair:285 raise NotFoundException(f"Key ID {key_id} not found for signing")286 kwargs = {}287 if signing_algorithm.startswith("RSA"):288 if "PKCS" in signing_algorithm:289 kwargs["padding"] = padding.PKCS1v15()290 elif "PSS" in signing_algorithm:291 kwargs["padding"] = padding.PSS(292 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH293 )294 else:295 LOG.warning("Unsupported padding in SigningAlgorithm '%s'", signing_algorithm)296 if "SHA_256" in signing_algorithm:297 kwargs["algorithm"] = hashes.SHA256()298 elif "SHA_384" in signing_algorithm:299 kwargs["algorithm"] = hashes.SHA384()300 elif "SHA_512" in signing_algorithm:301 kwargs["algorithm"] = hashes.SHA512()302 else:303 LOG.warning("Unsupported hash type in SigningAlgorithm '%s'", signing_algorithm)304 if signing_algorithm.startswith("ECDSA"):305 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))306 # generate signature307 signature = key_pair["_key_"].sign(data=message, **kwargs)308 result = {309 "KeyId": key_id,310 "Signature": signature,311 "SigningAlgorithm": signing_algorithm,312 }313 return SignResponse(**result)314 def encrypt(315 self,316 context: RequestContext,317 key_id: KeyIdType,318 plaintext: PlaintextType,319 encryption_context: EncryptionContextType = None,320 grant_tokens: GrantTokenList = None,321 encryption_algorithm: EncryptionAlgorithmSpec = None,322 ) -> EncryptResponse:323 # check if we have imported custom key material for this key324 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]325 if not matching:326 return call_moto(context)327 key_obj = kms_backends[context.region].keys.get(key_id)328 ciphertext_blob = encrypt(key_obj.key_material, plaintext)329 return EncryptResponse(330 CiphertextBlob=ciphertext_blob, KeyId=key_id, EncryptionAlgorithm=encryption_algorithm331 )332 def decrypt(333 self,334 context: RequestContext,335 ciphertext_blob: CiphertextType,336 encryption_context: EncryptionContextType = None,337 grant_tokens: GrantTokenList = None,338 key_id: KeyIdType = None,339 encryption_algorithm: EncryptionAlgorithmSpec = None,340 ) -> DecryptResponse:341 # check if we have imported custom key material for this key342 matching = [key for key in KMSBackend.get().imports.values() if key.key_id == key_id]343 if not matching:344 return call_moto(context)345 key_obj = kms_backends[context.region].keys.get(key_id)346 plaintext = decrypt(key_obj.key_material, ciphertext_blob)347 return DecryptResponse(348 KeyId=key_id, Plaintext=plaintext, EncryptionAlgorithm=encryption_algorithm349 )350 def get_parameters_for_import(351 self,352 context: RequestContext,353 key_id: KeyIdType,354 wrapping_algorithm: AlgorithmSpec,355 wrapping_key_spec: WrappingKeySpec,356 ) -> GetParametersForImportResponse:357 key = _generate_data_key_pair(358 {"KeySpec": wrapping_key_spec}, create_cipher=False, add_to_keys=False359 )360 import_token = short_uid()361 import_state = KeyImportState(362 key_id=key_id,363 import_token=import_token,364 private_key=key["PrivateKeyPlaintext"],365 public_key=key["PublicKey"],366 wrapping_algo=wrapping_algorithm,367 key_obj=key["_key_"],368 )369 KMSBackend.get().imports[import_token] = import_state370 expiry_date = datetime.datetime.now() + datetime.timedelta(days=100)371 return GetParametersForImportResponse(372 KeyId=key_id,373 ImportToken=to_bytes(import_state.import_token),374 PublicKey=import_state.public_key,375 ParametersValidTo=expiry_date,376 )377 def import_key_material(378 self,379 context: RequestContext,380 key_id: KeyIdType,381 import_token: CiphertextType,382 encrypted_key_material: CiphertextType,383 valid_to: DateType = None,384 expiration_model: ExpirationModelType = None,385 ) -> ImportKeyMaterialResponse:386 import_token = to_str(import_token)387 import_state = KMSBackend.get().imports.get(import_token)388 if not import_state:389 raise NotFoundException(f"Unable to find key import token '{import_token}'")390 key_obj = kms_backends[context.region].keys.get(key_id)391 if not key_obj:392 raise NotFoundException(f"Unable to find key '{key_id}'")393 key_material = import_state.key_obj.decrypt(encrypted_key_material, padding.PKCS1v15())394 key_obj.key_material = key_material395 return ImportKeyMaterialResponse()396 def _verify_key_exists(self, key_id):397 try:398 kms_backends[aws_stack.get_region()].describe_key(key_id)399 except Exception:400 raise ValidationError(f"Invalid key ID '{key_id}'")401 def _validate_grant(self, data: Dict):402 if KEY_ID not in data or GRANTEE_PRINCIPAL not in data or OPERATIONS not in data:403 raise ValidationError("Grant ID, key ID and grantee principal must be specified")404 for operation in data[OPERATIONS]:405 if operation not in VALID_OPERATIONS:406 raise ValidationError(407 f"Value {[OPERATIONS]} at 'operations' failed to satisfy constraint: Member must satisfy"408 f" constraint: [Member must satisfy enum value set: {VALID_OPERATIONS}]"409 )410 self._verify_key_exists(data[KEY_ID])411# ---------------412# UTIL FUNCTIONS413# ---------------414def _generate_data_key_pair(data, create_cipher=True, add_to_keys=True):415 region_details = KMSBackend.get()416 kms = aws_stack.connect_to_service("kms")417 key_id = data.get("KeyId")418 key_spec = data.get("KeyPairSpec") or data.get("KeySpec") or data.get("CustomerMasterKeySpec")419 key = None420 public_format = None421 if key_spec.startswith("RSA"):422 rsa_key_sizes = {423 "RSA_2048": 2048,424 "RSA_3072": 3072,425 "RSA_4096": 4096,426 }427 key_size = rsa_key_sizes.get(key_spec)428 key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)...

Full Screen

Full Screen

kms_listener.py

Source:kms_listener.py Github

copy

Full Screen

...172 set_response_content(response, result)173 response.status_code = 200174 return response175def generate_data_key_pair(data, response):176 result = _generate_data_key_pair(data)177 set_response_content(response, result)178 response.status_code = 200179 return response180def generate_data_key_pair_without_plaintext(data, response):181 result = _generate_data_key_pair(data)182 result.pop("PrivateKeyPlaintext", None)183 set_response_content(response, result)184 response.status_code = 200185 return response186def _generate_data_key_pair(data, create_cipher=True):187 region_details = KMSBackend.get()188 kms = aws_stack.connect_to_service("kms")189 key_id = data.get("KeyId")190 key_spec = data.get("KeyPairSpec") or data.get("KeySpec") or data.get("CustomerMasterKeySpec")191 key = None192 public_format = None193 if key_spec.startswith("RSA"):194 rsa_key_sizes = {195 "RSA_2048": 2048,196 "RSA_3072": 3072,197 "RSA_4096": 4096,198 }199 key_size = rsa_key_sizes.get(key_spec)200 key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)201 public_format = crypto_serialization.PublicFormat.PKCS1202 if key_spec.startswith("ECC"):203 curve = None204 if key_spec == "ECC_NIST_P256":205 curve = ec.BrainpoolP256R1()206 elif key_spec == "ECC_NIST_P384":207 curve = ec.BrainpoolP384R1()208 elif key_spec == "ECC_NIST_P521":209 curve = ec.BrainpoolP512R1()210 elif key_spec == "ECC_SECG_P256K1":211 curve = ec.SECP256K1()212 key = ec.generate_private_key(curve)213 public_format = crypto_serialization.PublicFormat.SubjectPublicKeyInfo214 private_key = key.private_bytes(215 crypto_serialization.Encoding.DER,216 crypto_serialization.PrivateFormat.PKCS8,217 crypto_serialization.NoEncryption(),218 )219 public_key = key.public_key().public_bytes(crypto_serialization.Encoding.DER, public_format)220 cipher_text_blob = None221 if create_cipher:222 cipher_text = kms.encrypt(KeyId=key_id, Plaintext=private_key)["CiphertextBlob"]223 cipher_text_blob = base64.b64encode(cipher_text)224 region = region_details.get_current_request_region()225 result = {226 "PrivateKeyCiphertextBlob": cipher_text_blob,227 "PrivateKeyPlaintext": base64.b64encode(private_key),228 "PublicKey": base64.b64encode(public_key),229 "KeyId": key_id,230 "KeyPairSpec": key_spec,231 "KeySpec": key_spec,232 "KeyUsage": "SIGN_VERIFY",233 "Policy": data.get("Policy"),234 "Region": region,235 "Description": data.get("Description"),236 "Arn": aws_stack.kms_key_arn(key_id),237 "_key_": key,238 }239 region_details.key_pairs[key_id] = result240 key = Key("", result["KeyUsage"], key_spec, result["Description"], region)241 key.id = key_id242 return {**key.to_dict()["KeyMetadata"], **result}243def set_key_managed(key_id: str) -> None:244 """245 Sets a KMS key to AWS managed246 :param key_id: ID of the KMS key247 """248 region_name = aws_stack.get_region()249 backend = kms_backends.get(region_name)250 key_data = backend.keys.get(key_id)251 if key_data:252 key_data.key_manager = "AWS"253def create_key(data: Dict):254 key_usage = data.get("KeyUsage")255 if key_usage != "SIGN_VERIFY" or KMS_PROVIDER == "local-kms":256 return257 data["KeyId"] = long_uid()258 result = _generate_data_key_pair(data, create_cipher=False)259 result = {"KeyMetadata": json_safe(result)}260 return result261def sign(data, response):262 region_details = KMSBackend.get()263 response.status_code = 200264 algo = data.get("SigningAlgorithm")265 key_id = data.get("KeyId")266 message = base64.b64decode(to_bytes(data.get("Message")))267 key_pair = region_details.key_pairs.get(key_id)268 kwargs = {}269 if algo.startswith("RSA"):270 if "PKCS" in algo:271 kwargs["padding"] = padding.PKCS1v15()272 elif "PSS" in algo:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful