Best Python code snippet using localstack_python
test_kms.py
Source:test_kms.py
...98 assert decrypted["Plaintext"] == result["PrivateKeyPlaintext"]99 @pytest.mark.parametrize("key_type", ["rsa", "ecc"])100 def test_sign(self, kms_client, key_type, kms_create_key):101 key_spec = "RSA_2048" if key_type == "rsa" else "ECC_NIST_P256"102 result = kms_create_key(KeyUsage="SIGN_VERIFY", KeySpec=key_spec)103 key_id = result["KeyId"]104 message = b"test message 123 !%$@"105 algo = "RSASSA_PSS_SHA_256" if key_type == "rsa" else "ECDSA_SHA_384"106 result = kms_client.sign(107 KeyId=key_id, Message=message, MessageType="RAW", SigningAlgorithm=algo108 )109 def _verify(signature):110 kwargs = {}111 if key_type == "rsa":112 kwargs["padding"] = padding.PSS(113 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH114 )115 kwargs["algorithm"] = hashes.SHA256()116 else:117 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=hashes.SHA384())118 public_key.verify(signature=signature, data=message, **kwargs)119 public_key_data = kms_client.get_public_key(KeyId=key_id)["PublicKey"]120 public_key = serialization.load_der_public_key(public_key_data)121 _verify(result["Signature"])122 with pytest.raises(InvalidSignature):123 _verify(result["Signature"] + b"foobar")124 @pytest.mark.aws_validated125 def test_get_and_list_sign_key(self, kms_client, kms_create_key):126 response = kms_create_key(KeyUsage="SIGN_VERIFY", CustomerMasterKeySpec="ECC_NIST_P256")127 key_id = response["KeyId"]128 describe_response = kms_client.describe_key(KeyId=key_id)["KeyMetadata"]129 assert describe_response["KeyId"] == key_id130 list_response = kms_client.list_keys()131 found = False132 for keyData in list_response["Keys"]:133 if keyData["KeyId"] == key_id:134 found = True135 break136 assert found is True137 def test_import_key(self, kms_client, kms_key):138 key_id = kms_key["KeyId"]139 # get key import params140 params = kms_client.get_parameters_for_import(141 KeyId=key_id, WrappingAlgorithm="RSAES_PKCS1_V1_5", WrappingKeySpec="RSA_2048"142 )143 assert params["KeyId"] == key_id144 assert params["ImportToken"]145 assert params["PublicKey"]146 assert isinstance(params["ParametersValidTo"], datetime)147 # create 256 bit symmetric key (import_key_material(..) works with symmetric keys, as per the docs)148 symmetric_key = bytes(getrandbits(8) for _ in range(32))149 assert len(symmetric_key) == 32150 # import symmetric key (key material) into KMS151 public_key = load_der_public_key(params["PublicKey"])152 encrypted_key = public_key.encrypt(symmetric_key, PKCS1v15())153 kms_client.import_key_material(154 KeyId=key_id,155 ImportToken=params["ImportToken"],156 EncryptedKeyMaterial=encrypted_key,157 ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE",158 )159 # use key to encrypt/decrypt data160 plaintext = b"test content 123 !#"161 encrypt_result = kms_client.encrypt(Plaintext=plaintext, KeyId=key_id)162 encrypted = encrypt(symmetric_key, plaintext)163 assert encrypt_result["CiphertextBlob"] == encrypted164 api_decrypted = kms_client.decrypt(165 CiphertextBlob=encrypt_result["CiphertextBlob"], KeyId=key_id166 )167 assert api_decrypted["Plaintext"] == plaintext168 @pytest.mark.aws_validated169 def test_list_aliases_of_key(self, kms_client, kms_create_key):170 aliased_key = kms_create_key()171 comparison_key = kms_create_key()172 alias_name = f"alias/{short_uid()}"173 kms_client.create_alias(AliasName=alias_name, TargetKeyId=aliased_key["KeyId"])174 response = kms_client.list_aliases(KeyId=aliased_key["KeyId"])175 assert len(response["Aliases"]) == 1176 response = kms_client.list_aliases(KeyId=comparison_key["KeyId"])...
kms.py
Source:kms.py
...22def kms_list_keys(client=None, region=None):23 response = client.list_keys()24 return [key['KeyId'] for key in response['Keys']]25@boto_client('kms')26def kms_create_key(description, policy=None, bypass_policy_lockout_safety_check=False,27 key_usage='ENCRYPT_DECRYPT', origin='AWS_KMS', tags=[],28 region=None, client=None):29 create_key_params = {30 'Description': description,31 'BypassPolicyLockoutSafetyCheck': bypass_policy_lockout_safety_check,32 'KeyUsage': key_usage,33 'Origin': origin,34 'Tags': tags35 }36 if policy:37 create_key_params['Policy'] = policy38 logger.debug({'create_key_params': create_key_params})39 return client.create_key(**create_key_params).get('KeyMetadata')40@boto_client('kms')41def kms_create_alias(alias_name, key_id, region=None, client=None):42 alias_name = __alias_name(alias_name)43 create_alias_params = {44 'AliasName': alias_name,45 'TargetKeyId': key_id46 }47 logger.debug({'create_alias_params': create_alias_params})48 try:49 client.create_alias(**create_alias_params)50 except Exception as e:51 logger.error(str(e))52 return False53 return True54def kms_ensure_key(alias_name, description=None, policy=None, bypass_policy_lockout_safety_check=False,55 key_usage='ENCRYPT_DECRYPT', origin='AWS_KMS', tags=[], region=None):56 alias_name = __alias_name(alias_name)57 key_alias = get_alias_attr(alias_name, region=region)58 if not key_alias:59 logger.debug('[kms_ensure_key] key does not exist... creating it...')60 if description is None:61 description = alias_name62 key = kms_create_key(63 description=description,64 policy=policy,65 bypass_policy_lockout_safety_check=bypass_policy_lockout_safety_check,66 key_usage=key_usage,67 origin=origin,68 tags=tags,69 region=region70 )71 if kms_create_alias(alias_name, key['KeyId'], region=region):72 # need to get the new alias arn73 key_alias = get_alias_attr(alias_name, region=region)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!