Best Python code snippet using localstack_python
kms_listener.py
Source:kms_listener.py
...164 result = _generate_data_key_pair(data)165 set_response_content(response, result)166 response.status_code = 200167 return response168def generate_data_key_pair_without_plaintext(data, response):169 result = _generate_data_key_pair(data)170 result.pop("PrivateKeyPlaintext", None)171 set_response_content(response, result)172 response.status_code = 200173 return response174def _generate_data_key_pair(data):175 key_id = data.get("KeyId")176 rsa_key_sizes = {177 "RSA_2048": 2048,178 "RSA_3072": 3072,179 "RSA_4096": 4096,180 }181 key_spec = data["KeyPairSpec"]182 key_size = rsa_key_sizes.get(key_spec)183 if not key_size:184 # TODO: support other crypto/keypair types!185 LOG.warning("Unsupported KeyPairSpec specified to generate key pair: '%s'", key_spec)186 key_size = 2048187 key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)188 private_key = key.private_bytes(189 crypto_serialization.Encoding.DER,190 crypto_serialization.PrivateFormat.PKCS8,191 crypto_serialization.NoEncryption(),192 )193 public_key = key.public_key().public_bytes(194 crypto_serialization.Encoding.DER, crypto_serialization.PublicFormat.PKCS1195 )196 kms = aws_stack.connect_to_service("kms")197 cipher_text = kms.encrypt(KeyId=key_id, Plaintext=private_key)["CiphertextBlob"]198 result = {199 "PrivateKeyCiphertextBlob": base64.b64encode(cipher_text),200 "PrivateKeyPlaintext": base64.b64encode(private_key),201 "PublicKey": base64.b64encode(public_key),202 "KeyId": key_id,203 "KeyPairSpec": data.get("KeyPairSpec"),204 }205 key_pairs = _get_key_pairs()206 key_pairs[key_id] = result207 return result208def _get_key_pairs():209 region_name = aws_stack.get_region()210 backend = kms_backends.get(region_name)211 key_pairs = getattr(backend, ATTR_KEY_PAIRS, {})212 setattr(backend, ATTR_KEY_PAIRS, key_pairs)213 return key_pairs214def set_key_managed(key_id) -> None:215 """216 Sets a KMS key to AWS managed217 :param key_id: ID of the KMS key218 """219 region_name = aws_stack.get_region()220 backend = kms_backends.get(region_name)221 key_data = backend.keys.get(key_id)222 if key_data:223 key_data.key_manager = "AWS"224class ProxyListenerKMS(ProxyListener):225 def forward_request(self, method, path, data, headers):226 action = headers.get("X-Amz-Target") or ""227 action = action.split(".")[-1]228 if method == "POST" and path == "/":229 parsed_data = json.loads(to_str(data))230 if action == "CreateKey":231 descr = parsed_data.get("Description") or ""232 event_publisher.fire_event(233 EVENT_KMS_CREATE_KEY, {"k": event_publisher.get_hash(descr)}234 )235 elif action == "CreateGrant":236 return handle_create_grant(parsed_data)237 elif action == "ListGrants":238 return handle_list_grants(parsed_data)239 elif action == "RevokeGrant":240 return handle_revoke_grant(parsed_data)241 elif action == "RetireGrant":242 return handle_retire_grant(parsed_data)243 elif action == "ListRetirableGrants":244 return handle_list_retirable_grants(parsed_data)245 return True246 def return_response(self, method, path, data, headers, response):247 if method == "POST" and path == "/":248 parsed_data = json.loads(to_str(data))249 action = headers.get("X-Amz-Target") or ""250 action = action.split(".")[-1]251 if response.status_code == 501:252 if action == "GetPublicKey":253 return handle_get_public_key(parsed_data, response)254 if action == "GenerateDataKeyPair":255 return generate_data_key_pair(parsed_data, response)256 if action == "GenerateDataKeyPairWithoutPlaintext":257 return generate_data_key_pair_without_plaintext(parsed_data, response)258class KMSBackend(RegionBackend):259 # maps grant ID to grant details260 grants: Dict[str, Dict]261 # maps pagination markers to result lists262 markers: Dict[str, List]263 def __init__(self):264 self.grants = {}265 self.markers = {}266# instantiate listener...
test_kms.py
Source:test_kms.py
...58 assert len(grants_after) == len(grants_before) - 159 def test_asymmetric_keys(self, kms_client, kms_key):60 key_id = kms_key["KeyMetadata"]["KeyId"]61 # generate key pair without plaintext62 result = kms_client.generate_data_key_pair_without_plaintext(63 KeyId=key_id, KeyPairSpec="RSA_2048"64 )65 assert result.get("PrivateKeyCiphertextBlob")66 assert not result.get("PrivateKeyPlaintext")67 assert result.get("PublicKey")68 # generate key pair69 result = kms_client.generate_data_key_pair(KeyId=key_id, KeyPairSpec="RSA_2048")70 assert result.get("PrivateKeyCiphertextBlob")71 assert result.get("PrivateKeyPlaintext")72 assert result.get("PublicKey")73 # get public key74 result1 = kms_client.get_public_key(KeyId=key_id)75 assert result.get("KeyId") == result1.get("KeyId")76 assert result.get("KeySpec") == result1.get("KeySpec")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!