Best Python code snippet using localstack_python
generic_proxy.py
Source:generic_proxy.py
...482 data = data_bytes483 original_request = RoutingRequest(method=method, path=path, data=data, headers=headers)484 def is_full_url(url):485 return re.match(r"[a-zA-Z]+://.+", url)486 def get_proxy_backend_url(_path, original_url=None, run_listeners=False):487 if is_full_url(_path):488 _path = _path.split("://", 1)[1]489 _path = "/%s" % (_path.split("/", 1)[1] if "/" in _path else "")490 base_url = forward_base_url or original_url491 result = update_path_in_url(base_url, _path)492 if run_listeners:493 for listener in listeners_inbound:494 result = listener.get_forward_url(method, path, data, headers) or result495 return result496 target_url = path497 if not is_full_url(target_url):498 target_url = "%s%s" % (forward_base_url, target_url)499 # update original "Host" header (moto s3 relies on this behavior)500 if not headers.get("Host"):501 headers["host"] = urlparse(target_url).netloc502 headers["X-Forwarded-For"] = build_x_forwarded_for(headers, client_address, server_address)503 response = None504 handler_chain_request = original_request.copy()505 modified_request_to_backend = None506 # run inbound handlers (pre-invocation)507 for listener in listeners_inbound:508 try:509 listener_result = listener.forward_request(510 method=handler_chain_request.method,511 path=handler_chain_request.path,512 data=handler_chain_request.data,513 headers=handler_chain_request.headers,514 )515 except HTTPException as e:516 # TODO: implement properly using exception handlers517 return http_exception_to_response(e)518 if isinstance(listener, MessageModifyingProxyListener):519 if isinstance(listener_result, RoutingRequest):520 # update the modified request details, then call next listener521 handler_chain_request.method = (522 listener_result.method or handler_chain_request.method523 )524 handler_chain_request.path = listener_result.path or handler_chain_request.path525 if listener_result.data is not None:526 handler_chain_request.data = listener_result.data527 if listener_result.headers is not None:528 handler_chain_request.headers = listener_result.headers529 continue530 if isinstance(listener_result, Response):531 response = listener_result532 break533 if isinstance(listener_result, LambdaResponse):534 response = listener_result535 break536 if isinstance(listener_result, dict):537 response = Response()538 response._content = json.dumps(json_safe(listener_result))539 response.headers["Content-Type"] = APPLICATION_JSON540 response.status_code = 200541 break542 elif isinstance(listener_result, Request):543 # TODO: unify modified_request_to_backend (requests.Request) and544 # handler_chain_request (ls.routing.Request)545 modified_request_to_backend = listener_result546 break547 elif http2_server.get_async_generator_result(listener_result):548 return listener_result549 elif listener_result is not True:550 # get status code from response, or use Bad Gateway status code551 code = listener_result if isinstance(listener_result, int) else 503552 response = Response()553 response.status_code = code554 response._content = ""555 response.headers["Content-Length"] = "0"556 append_cors_headers(request_headers=headers, response=response)557 return response558 # perform the actual invocation of the backend service559 headers_to_send = None560 data_to_send = None561 method_to_send = None562 if response is None:563 headers_to_send = handler_chain_request.headers564 headers_to_send["Connection"] = headers_to_send.get("Connection") or "close"565 data_to_send = handler_chain_request.data566 method_to_send = handler_chain_request.method567 request_url = get_proxy_backend_url(handler_chain_request.path, run_listeners=True)568 if modified_request_to_backend:569 if modified_request_to_backend.url:570 request_url = get_proxy_backend_url(571 modified_request_to_backend.url, original_url=request_url572 )573 data_to_send = modified_request_to_backend.data574 if modified_request_to_backend.method:575 method_to_send = modified_request_to_backend.method576 # make sure we drop "chunked" transfer encoding from the headers to be forwarded577 headers_to_send.pop("Transfer-Encoding", None)578 response = requests.request(579 method_to_send,580 request_url,581 data=data_to_send,582 headers=headers_to_send,583 stream=True,584 verify=False,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!