Best Python code snippet using locust
formparser.pyi
Source:formparser.pyi
1from typing import Any, Optional, Text, Tuple, Callable, Iterable, TypeVar, NoReturn, Protocol, IO, Generator, Dict, Mapping, Union2from wsgiref.types import WSGIEnvironment3from .datastructures import Headers4_Dict = Any5_ParseFunc = Callable[[IO[bytes], str, Optional[int], Mapping[str, str]], Tuple[IO[bytes], _Dict, _Dict]]6_F = TypeVar("_F", bound=Callable[..., Any])7class _StreamFactory(Protocol):8 def __call__(9 self, total_content_length: Optional[int], filename: str, content_type: str, content_length: Optional[int] = ...,10 ) -> IO[bytes]: ...11def default_stream_factory(12 total_content_length: Optional[int], filename: str, content_type: str, content_length: Optional[int] = ...,13) -> IO[bytes]: ...14def parse_form_data(15 environ: WSGIEnvironment,16 stream_factory: Optional[_StreamFactory] = ...,17 charset: Text = ...,18 errors: Text = ...,19 max_form_memory_size: Optional[int] = ...,20 max_content_length: Optional[int] = ...,21 cls: Optional[Callable[[], _Dict]] = ...,22 silent: bool = ...,23) -> Tuple[IO[bytes], _Dict, _Dict]: ...24def exhaust_stream(f: _F) -> _F: ...25class FormDataParser(object):26 stream_factory: _StreamFactory27 charset: Text28 errors: Text29 max_form_memory_size: Optional[int]30 max_content_length: Optional[int]31 cls: Callable[[], _Dict]32 silent: bool33 def __init__(34 self,35 stream_factory: Optional[_StreamFactory] = ...,36 charset: Text = ...,37 errors: Text = ...,38 max_form_memory_size: Optional[int] = ...,39 max_content_length: Optional[int] = ...,40 cls: Optional[Callable[[], _Dict]] = ...,41 silent: bool = ...,42 ) -> None: ...43 def get_parse_func(self, mimetype: str, options: Any) -> Optional[_ParseFunc]: ...44 def parse_from_environ(self, environ: WSGIEnvironment) -> Tuple[IO[bytes], _Dict, _Dict]: ...45 def parse(46 self, stream: IO[bytes], mimetype: Text, content_length: Optional[int], options: Optional[Mapping[str, str]] = ...,47 ) -> Tuple[IO[bytes], _Dict, _Dict]: ...48 parse_functions: Dict[Text, _ParseFunc]49def is_valid_multipart_boundary(boundary: str) -> bool: ...50def parse_multipart_headers(iterable: Iterable[Union[Text, bytes]]) -> Headers: ...51class MultiPartParser(object):52 charset: Text53 errors: Text54 max_form_memory_size: Optional[int]55 stream_factory: _StreamFactory56 cls: Callable[[], _Dict]57 buffer_size: int58 def __init__(59 self,60 stream_factory: Optional[_StreamFactory] = ...,61 charset: Text = ...,62 errors: Text = ...,63 max_form_memory_size: Optional[int] = ...,64 cls: Optional[Callable[[], _Dict]] = ...,65 buffer_size: int = ...,66 ) -> None: ...67 def fail(self, message: Text) -> NoReturn: ...68 def get_part_encoding(self, headers: Mapping[str, str]) -> Optional[str]: ...69 def get_part_charset(self, headers: Mapping[str, str]) -> Text: ...70 def start_file_streaming(71 self, filename: Union[Text, bytes], headers: Mapping[str, str], total_content_length: Optional[int],72 ) -> Tuple[Text, IO[bytes]]: ...73 def in_memory_threshold_reached(self, bytes: Any) -> NoReturn: ...74 def validate_boundary(self, boundary: Optional[str]) -> None: ...75 def parse_lines(76 self, file: Any, boundary: bytes, content_length: int, cap_at_buffer: bool = ...,77 ) -> Generator[Tuple[str, Any], None, None]: ...78 def parse_parts(self, file: Any, boundary: bytes, content_length: int) -> Generator[Tuple[str, Any], None, None]: ......
file_streamer.py
Source:file_streamer.py
1import time2import werkzeug.formparser3from flask import Flask, Response, request4class DummyWerkzeugFile:5 def write(self, b: bytes):6 print('reading file parts: size=%s' % len(b))7 def seek(self, *args, **kwargs):8 # Hack: this is how we know we've finished reading the request file.9 return 010def stream_factory(total_content_length, content_type, filename, content_length=None):11 print(12 'Start writing to stream: '13 'total_content_length=%s, content_type=%s, filename=%s, content_length=%s' % (14 total_content_length, content_type, filename, content_length15 )16 )17 # Here we can return anything with a write(b: bytes) and seek() method, like a file.18 return DummyWerkzeugFile()19def main():20 app = Flask('file-streamer')21 @app.route('/upload', methods=['POST'])22 def upload():23 print('Starting to read request')24 start = time.time()25 stream, form, files = werkzeug.formparser.parse_form_data(26 request.environ, stream_factory=stream_factory27 )28 end = time.time()29 print('Finished reading request: time=%s' % (end - start))30 print('Form: %s' % form)31 print('Files: %s' % files)32 return Response(status=200)33 app.run('0.0.0.0', 5001)34if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!