Best Python code snippet using localstack_python
generic_proxy.py
Source:generic_proxy.py
...165 headers[ACL_EXPOSE_HEADERS] = ",".join(CORS_EXPOSE_HEADERS)166 for header in ALLOWED_CORS_RESPONSE_HEADERS:167 if headers.get(header) == "":168 del headers[header]169def http_exception_to_response(e: HTTPException):170 """Convert a werkzeug HTTP exception to a requests.Response object"""171 response = Response()172 response.status_code = e.code173 response.headers.update(dict(e.get_headers()))174 body = e.get_body()175 response.headers["Content-Length"] = str(len(str(body or "")))176 response._content = body177 return response178def cors_error_response():179 response = Response()180 response.status_code = 403181 return response182def is_cors_origin_allowed(headers, allowed_origins=None):183 """Returns true if origin is allowed to perform cors requests, false otherwise"""184 allowed_origins = ALLOWED_CORS_ORIGINS if allowed_origins is None else allowed_origins185 origin = headers.get("origin")186 referer = headers.get("referer")187 if origin:188 return origin in allowed_origins189 elif referer:190 referer_uri = "{uri.scheme}://{uri.netloc}".format(uri=urlparse(referer))191 return referer_uri in allowed_origins192 # If both headers are not set, let it through (awscli etc. do not send these headers)193 return True194def should_enforce_self_managed_service(method, path, headers, data):195 if config.DISABLE_CUSTOM_CORS_S3 and config.DISABLE_CUSTOM_CORS_APIGATEWAY:196 return True197 # allow only certain api calls without checking origin198 import localstack.services.edge199 api, _ = localstack.services.edge.get_api_from_custom_rules(method, path, data, headers) or (200 "",201 None,202 )203 if not config.DISABLE_CUSTOM_CORS_S3 and api == "s3":204 return False205 if not config.DISABLE_CUSTOM_CORS_APIGATEWAY and api == "apigateway":206 return False207 return True208def modify_and_forward(209 method=None,210 path=None,211 data_bytes=None,212 headers=None,213 forward_base_url=None,214 listeners=None,215 request_handler=None,216 client_address=None,217 server_address=None,218):219 """This is the central function that coordinates the incoming/outgoing messages220 with the proxy listeners (message interceptors)."""221 # Check origin / referer header before anything else happens.222 if (223 not config.DISABLE_CORS_CHECKS224 and should_enforce_self_managed_service(method, path, headers, data_bytes)225 and not is_cors_origin_allowed(headers)226 ):227 LOG.info(228 "Blocked cors request from forbidden origin %s",229 headers.get("origin") or headers.get("referer"),230 )231 return cors_error_response()232 listeners = ProxyListener.DEFAULT_LISTENERS + (listeners or [])233 listeners = [lis for lis in listeners if lis]234 data = data_bytes235 def is_full_url(url):236 return re.match(r"[a-zA-Z]+://.+", url)237 if is_full_url(path):238 path = path.split("://", 1)[1]239 path = "/%s" % (path.split("/", 1)[1] if "/" in path else "")240 proxy_url = "%s%s" % (forward_base_url, path)241 for listener in listeners:242 proxy_url = listener.get_forward_url(method, path, data, headers) or proxy_url243 target_url = path244 if not is_full_url(target_url):245 target_url = "%s%s" % (forward_base_url, target_url)246 # update original "Host" header (moto s3 relies on this behavior)247 if not headers.get("Host"):248 headers["host"] = urlparse(target_url).netloc249 headers["X-Forwarded-For"] = build_x_forwarded_for(headers, client_address, server_address)250 response = None251 modified_request = None252 # update listener (pre-invocation)253 for listener in listeners:254 try:255 listener_result = listener.forward_request(256 method=method, path=path, data=data, headers=headers257 )258 except HTTPException as e:259 # TODO: implement properly using exception handlers260 return http_exception_to_response(e)261 if isinstance(listener_result, Response):262 response = listener_result263 break264 if isinstance(listener_result, LambdaResponse):265 response = listener_result266 break267 if isinstance(listener_result, dict):268 response = Response()269 response._content = json.dumps(json_safe(listener_result))270 response.headers["Content-Type"] = APPLICATION_JSON271 response.status_code = 200272 break273 elif isinstance(listener_result, Request):274 modified_request = listener_result...
applications.py
Source:applications.py
1from __future__ import annotations2import dataclasses3import inspect4import traceback5from typing import Any, Awaitable, Callable, List, Optional, TypeVar, Union6from typing_extensions import Literal, TypeAlias7from .exceptions import HTTPException, http_exception_to_response8from .requests import Request9from .responses import Response10from .routing import Router11from .types import Receive, Scope, Send12class Soie:13 def __init__(14 self,15 *,16 debug: bool = False,17 on_startup: List[LifeSpanHook] = None,18 on_shutdown: List[LifeSpanHook] = None,19 router: Router = None,20 exception_handlers: Optional[ExceptionHandlers] = None,21 ):22 self.debug = debug23 self.lifespan = LifeSpan(on_startup or [], on_shutdown or [])24 if router is None:25 router = Router()26 self.router = router27 if exception_handlers is None:28 exception_handlers = {}29 self._exception_handlers: ExceptionHandlers = {HTTPException: http_exception_to_response} | exception_handlers30 async def app(self, scope: Scope, receive: Receive, send: Send) -> None:31 scope_type: Literal["lifespan", "http", "websocket"] = scope["type"]32 return await getattr(self, scope_type)(scope, receive, send)33 async def http(self, scope: Scope, receive: Receive, send: Send) -> None:34 async with ASGIContextManager(scope, receive, send, self._exception_handlers) as request:35 route = self.router.get_route(request)36 response = await route.endpoint(request)37 await response(scope, receive, send)38 async def websocket(self, scope: Scope, receive: Receive, send: Send) -> None:39 raise NotImplementedError40 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:41 scope["app"] = self42 await self.app(scope, receive, send)43 def add_exception_handler(self, exc_type: type[BaseException], handler: ExceptionHandler) -> None:44 self._exception_handlers[exc_type] = handler45 def exception_handler(self, exc_type: type[BaseException]) -> Callable[[ExceptionHandler], ExceptionHandler]:46 def decorator(handler: ExceptionHandler) -> ExceptionHandler:47 self.add_exception_handler(exc_type, handler)48 return handler49 return decorator50LifeSpanHook: TypeAlias = Union[Callable[[], Any], Callable[[], Awaitable[Any]]]51@dataclasses.dataclass52class LifeSpan:53 on_startup: List[LifeSpanHook] = dataclasses.field(default_factory=list)54 on_shutdown: List[LifeSpanHook] = dataclasses.field(default_factory=list)55 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:56 message = await receive()57 assert message["type"] == "lifespan.startup"58 try:59 for handler in self.on_startup:60 result = handler()61 if inspect.isawaitable(result):62 await result63 except BaseException:64 msg = traceback.format_exc()65 await send({"type": "lifespan.startup.failed", "message": msg})66 raise67 await send({"type": "lifespan.startup.completed"})68 message = await receive()69 assert message["type"] == "lifespan.shutdown"70 try:71 for handler in self.on_shutdown:72 result = handler()73 if inspect.isawaitable(result):74 await result75 except BaseException:76 msg = traceback.format_exc()77 await send({"type": "lifespan.shutdown.failed", "message": msg})78 raise79 await send({"type": "lifespan.shutdown.completed"})80E = TypeVar("E", bound=BaseException)81ExceptionHandler = Callable[[Request, E], Awaitable[Response]]82ExceptionHandlers = dict[type[BaseException], ExceptionHandler[BaseException]]83class ASGIContextManager:84 def __init__(85 self,86 scope: Scope,87 receive: Receive,88 send: Send,89 exception_handlers: ExceptionHandlers,90 ):91 self._scope = scope92 self._receive = receive93 self._send = send94 self.request = Request(scope, receive)95 self.exception_handlers = exception_handlers96 async def __aenter__(self) -> Request:97 return self.request98 async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:99 if exc_type is None:100 return True101 convertor = self.exception_handlers.get(exc_type)102 if convertor is None:103 return False104 response = await convertor(self.request, exc_val)105 await response(self._scope, self._receive, self._send)...
exceptions.py
Source:exceptions.py
...17 if message is None:18 message = HTTPStatus(status_code).description19 self.message = message20 super().__init__(status_code, message)21async def http_exception_to_response(_, exc: HTTPException) -> Response:22 return JSONResponse(content=exc.message, status_code=exc.status_code, headers=exc.headers)23class ParamNotMatched(SoieException):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!