Best Python code snippet using localstack_python
aws_responses.py
Source:aws_responses.py
...147 parsed_response["Error"]["AWSAccessKeyId"] = aws_access_token148 parsed_response["Error"]["StringToSign"] = string_to_sign149 parsed_response["Error"]["SignatureProvided"] = signature150 parsed_response["Error"]["StringToSignBytes"] = "{}".format(bytes_signature.decode("utf-8"))151 set_response_content(response, xmltodict.unparse(parsed_response))152 if expires and code_string == "AccessDenied":153 server_time = datetime.datetime.utcnow().isoformat()[:-4]154 expires_isoformat = datetime.datetime.fromtimestamp(int(expires)).isoformat()[:-4]155 parsed_response["Error"]["Code"] = code_string156 parsed_response["Error"]["Expires"] = "{}Z".format(expires_isoformat)157 parsed_response["Error"]["ServerTime"] = "{}Z".format(server_time)158 set_response_content(response, xmltodict.unparse(parsed_response))159 if not signature and not expires and code_string == "AccessDenied":160 set_response_content(response, xmltodict.unparse(parsed_response))161 if response._content:162 return response163def flask_error_response_xml(164 message: str,165 code: Optional[int] = 500,166 code_string: Optional[str] = "InternalFailure",167 service: Optional[str] = None,168 xmlns: Optional[str] = None,169):170 response = requests_error_response_xml(171 message, code=code, code_string=code_string, service=service, xmlns=xmlns172 )173 return requests_to_flask_response(response)174def requests_error_response(175 req_headers: Dict,176 message: Union[str, bytes],177 code: int = 500,178 error_type: str = "InternalFailure",179 service: str = None,180 xmlns: str = None,181):182 is_json = is_json_request(req_headers)183 if is_json:184 return requests_error_response_json(message=message, code=code, error_type=error_type)185 return requests_error_response_xml(186 message, code=code, code_string=error_type, service=service, xmlns=xmlns187 )188def is_json_request(req_headers: Dict) -> bool:189 ctype = req_headers.get("Content-Type", "")190 accept = req_headers.get("Accept", "")191 return "json" in ctype or "json" in accept192def is_invalid_html_response(headers, content) -> bool:193 content_type = headers.get("Content-Type", "")194 return "text/html" in content_type and not str_startswith_ignore_case(content, "<!doctype html")195def raise_exception_if_error_response(response):196 if not is_response_obj(response):197 return198 if response.status_code < 400:199 return200 content = "..."201 try:202 content = truncate(to_str(response.content or ""))203 except Exception:204 pass # ignore if content has non-printable bytes205 raise Exception("Received error response (code %s): %s" % (response.status_code, content))206def is_response_obj(result, include_lambda_response=False):207 types = (RequestsResponse, FlaskResponse)208 if include_lambda_response:209 types += (LambdaResponse,)210 return isinstance(result, types)211def get_response_payload(response, as_json=False):212 result = (213 response.content214 if isinstance(response, RequestsResponse)215 else response.data216 if isinstance(response, FlaskResponse)217 else None218 )219 result = "" if result is None else result220 if as_json:221 result = result or "{}"222 result = json.loads(to_str(result))223 return result224def requests_response(content, status_code=200, headers=None):225 if headers is None:226 headers = {}227 resp = RequestsResponse()228 headers = CaseInsensitiveDict(dict(headers or {}))229 if isinstance(content, dict):230 content = json.dumps(content)231 if not headers.get(HEADER_CONTENT_TYPE):232 headers[HEADER_CONTENT_TYPE] = APPLICATION_JSON233 resp._content = content234 resp.status_code = int(status_code)235 # Note: update headers (instead of assigning directly), to ensure we're using a case-insensitive dict236 resp.headers.update(headers)237 return resp238def request_response_stream(stream, status_code=200, headers=None):239 if headers is None:240 headers = {}241 resp = RequestsResponse()242 resp.raw = stream243 resp.status_code = int(status_code)244 # Note: update headers (instead of assigning directly), to ensure we're using a case-insensitive dict245 resp.headers.update(headers or {})246 return resp247def flask_to_requests_response(r):248 return requests_response(r.data, status_code=r.status_code, headers=r.headers)249def requests_to_flask_response(r):250 return FlaskResponse(r.content, status=r.status_code, headers=dict(r.headers))251def flask_not_found_error(msg=None):252 msg = msg or "The specified resource doesnt exist."253 return flask_error_response_json(msg, code=404, error_type="ResourceNotFoundException")254def response_regex_replace(response, search, replace):255 content = re.sub(search, replace, to_str(response._content), flags=re.DOTALL | re.MULTILINE)256 set_response_content(response, content)257def set_response_content(response, content, headers=None):258 if isinstance(content, dict):259 content = json.dumps(json_safe(content))260 elif isinstance(content, RequestsResponse):261 response.status_code = content.status_code262 content = content.content263 response._content = content or ""264 response.headers.update(headers or {})265 response.headers["Content-Length"] = str(len(response._content))266def make_requests_error(*args, **kwargs):267 return flask_to_requests_response(flask_error_response_xml(*args, **kwargs))268def make_error(*args, **kwargs):269 return flask_error_response_xml(*args, **kwargs)270def create_sqs_system_attributes(headers):271 system_attributes = {}...
kms_listener.py
Source:kms_listener.py
...168 "EncryptionAlgorithms",169 "SigningAlgorithms",170 ]171 result = select_attributes(result, attrs)172 set_response_content(response, result)173 response.status_code = 200174 return response175def generate_data_key_pair(data, response):176 result = _generate_data_key_pair(data)177 set_response_content(response, result)178 response.status_code = 200179 return response180def generate_data_key_pair_without_plaintext(data, response):181 result = _generate_data_key_pair(data)182 result.pop("PrivateKeyPlaintext", None)183 set_response_content(response, result)184 response.status_code = 200185 return response186def _generate_data_key_pair(data, create_cipher=True):187 region_details = KMSBackend.get()188 kms = aws_stack.connect_to_service("kms")189 key_id = data.get("KeyId")190 key_spec = data.get("KeyPairSpec") or data.get("KeySpec") or data.get("CustomerMasterKeySpec")191 key = None192 public_format = None193 if key_spec.startswith("RSA"):194 rsa_key_sizes = {195 "RSA_2048": 2048,196 "RSA_3072": 3072,197 "RSA_4096": 4096,198 }199 key_size = rsa_key_sizes.get(key_spec)200 key = rsa.generate_private_key(public_exponent=65537, key_size=key_size)201 public_format = crypto_serialization.PublicFormat.PKCS1202 if key_spec.startswith("ECC"):203 curve = None204 if key_spec == "ECC_NIST_P256":205 curve = ec.BrainpoolP256R1()206 elif key_spec == "ECC_NIST_P384":207 curve = ec.BrainpoolP384R1()208 elif key_spec == "ECC_NIST_P521":209 curve = ec.BrainpoolP512R1()210 elif key_spec == "ECC_SECG_P256K1":211 curve = ec.SECP256K1()212 key = ec.generate_private_key(curve)213 public_format = crypto_serialization.PublicFormat.SubjectPublicKeyInfo214 private_key = key.private_bytes(215 crypto_serialization.Encoding.DER,216 crypto_serialization.PrivateFormat.PKCS8,217 crypto_serialization.NoEncryption(),218 )219 public_key = key.public_key().public_bytes(crypto_serialization.Encoding.DER, public_format)220 cipher_text_blob = None221 if create_cipher:222 cipher_text = kms.encrypt(KeyId=key_id, Plaintext=private_key)["CiphertextBlob"]223 cipher_text_blob = base64.b64encode(cipher_text)224 region = region_details.get_current_request_region()225 result = {226 "PrivateKeyCiphertextBlob": cipher_text_blob,227 "PrivateKeyPlaintext": base64.b64encode(private_key),228 "PublicKey": base64.b64encode(public_key),229 "KeyId": key_id,230 "KeyPairSpec": key_spec,231 "KeySpec": key_spec,232 "KeyUsage": "SIGN_VERIFY",233 "Policy": data.get("Policy"),234 "Region": region,235 "Description": data.get("Description"),236 "Arn": aws_stack.kms_key_arn(key_id),237 "_key_": key,238 }239 region_details.key_pairs[key_id] = result240 key = Key("", result["KeyUsage"], key_spec, result["Description"], region)241 key.id = key_id242 return {**key.to_dict()["KeyMetadata"], **result}243def set_key_managed(key_id: str) -> None:244 """245 Sets a KMS key to AWS managed246 :param key_id: ID of the KMS key247 """248 region_name = aws_stack.get_region()249 backend = kms_backends.get(region_name)250 key_data = backend.keys.get(key_id)251 if key_data:252 key_data.key_manager = "AWS"253def create_key(data: Dict):254 key_usage = data.get("KeyUsage")255 if key_usage != "SIGN_VERIFY" or KMS_PROVIDER == "local-kms":256 return257 data["KeyId"] = long_uid()258 result = _generate_data_key_pair(data, create_cipher=False)259 result = {"KeyMetadata": json_safe(result)}260 return result261def sign(data, response):262 region_details = KMSBackend.get()263 response.status_code = 200264 algo = data.get("SigningAlgorithm")265 key_id = data.get("KeyId")266 message = base64.b64decode(to_bytes(data.get("Message")))267 key_pair = region_details.key_pairs.get(key_id)268 kwargs = {}269 if algo.startswith("RSA"):270 if "PKCS" in algo:271 kwargs["padding"] = padding.PKCS1v15()272 elif "PSS" in algo:273 kwargs["padding"] = padding.PSS(274 mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH275 )276 else:277 LOG.warning("Unsupported padding in SigningAlgorithm '%s'", algo)278 if "SHA_256" in algo:279 kwargs["algorithm"] = hashes.SHA256()280 elif "SHA_384" in algo:281 kwargs["algorithm"] = hashes.SHA384()282 elif "SHA_512" in algo:283 kwargs["algorithm"] = hashes.SHA512()284 else:285 LOG.warning("Unsupported hash type in SigningAlgorithm '%s'", algo)286 if algo.startswith("ECDSA"):287 kwargs["signature_algorithm"] = ec.ECDSA(algorithm=kwargs.pop("algorithm", None))288 # generate signature289 signature = key_pair["_key_"].sign(data=message, **kwargs)290 result = {291 "KeyId": key_id,292 "Signature": to_str(base64.b64encode(signature)),293 "SigningAlgorithm": algo,294 }295 set_response_content(response, json.dumps(result))296 return response297def search_key_pair(data, response):298 key_pairs = KMSBackend.get().key_pairs299 key = key_pairs.get(data.get("KeyId"))300 if not key:301 return response302 key_object = Key(303 key["Policy"], key["KeyUsage"], key["KeySpec"], key["Description"], key["Region"]304 )305 key_object.id = key["KeyId"]306 response.status_code = 200307 set_response_content(response, json.dumps(key_object.to_dict()))308 return response309def add_key_pairs(response):310 key_pairs = KMSBackend.get().key_pairs311 response.status_code = 200312 content = json.loads(to_str(response.content))313 prev_keys = content["Keys"]314 for id in key_pairs:315 prev_keys.append(316 {317 "KeyId": key_pairs[id]["KeyId"],318 "KeyArn": key_pairs[id]["Arn"],319 }320 )321 content["Keys"] = prev_keys322 set_response_content(response, json.dumps(content))323 return response324class ProxyListenerKMS(ProxyListener):325 def forward_request(self, method, path, data, headers):326 action = headers.get("X-Amz-Target") or ""327 action = action.split(".")[-1]328 if method == "POST" and path == "/":329 parsed_data = json.loads(to_str(data))330 if action == "CreateKey":331 descr = parsed_data.get("Description") or ""332 event_publisher.fire_event(333 EVENT_KMS_CREATE_KEY, {"k": event_publisher.get_hash(descr)}334 )335 result = create_key(parsed_data)336 if result is not None:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!