How to use enable_gzip method in Playwright Python

Best Python code snippet using playwright-python

_base.py

Source:_base.py Github

copy

Full Screen

1"""Commons function for Sync and Async client."""2from __future__ import absolute_import3import base644import codecs5import configparser6import csv7import os8from datetime import datetime, timedelta9from typing import Iterator, List, Generator, Any, Union, Iterable, AsyncGenerator10from urllib3 import HTTPResponse11from influxdb_client import Configuration, Dialect, Query, OptionStatement, VariableAssignment, Identifier, \12 Expression, BooleanLiteral, IntegerLiteral, FloatLiteral, DateTimeLiteral, UnaryExpression, DurationLiteral, \13 Duration, StringLiteral, ArrayExpression, ImportDeclaration, MemberExpression, MemberAssignment, File, \14 WriteService, QueryService, DeleteService, DeletePredicateRequest15from influxdb_client.client.flux_csv_parser import FluxResponseMetadataMode, FluxCsvParser, FluxSerializationMode16from influxdb_client.client.flux_table import FluxTable, FluxRecord17from influxdb_client.client.util.date_utils import get_date_helper18from influxdb_client.client.util.helpers import get_org_query_param19from influxdb_client.client.write.dataframe_serializer import DataframeSerializer20from influxdb_client.rest import _UTF_8_encoding21try:22 import dataclasses23 _HAS_DATACLASS = True24except ModuleNotFoundError:25 _HAS_DATACLASS = False26# noinspection PyMethodMayBeStatic27class _BaseClient(object):28 def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None,29 default_tags: dict = None, **kwargs) -> None:30 self.url = url31 self.token = token32 self.org = org33 self.default_tags = default_tags34 self.conf = _Configuration()35 if self.url.endswith("/"):36 self.conf.host = self.url[:-1]37 else:38 self.conf.host = self.url39 self.conf.enable_gzip = enable_gzip40 self.conf.debug = debug41 self.conf.verify_ssl = kwargs.get('verify_ssl', True)42 self.conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None)43 self.conf.proxy = kwargs.get('proxy', None)44 self.conf.proxy_headers = kwargs.get('proxy_headers', None)45 self.conf.connection_pool_maxsize = kwargs.get('connection_pool_maxsize', self.conf.connection_pool_maxsize)46 self.conf.timeout = timeout47 auth_token = self.token48 self.auth_header_name = "Authorization"49 self.auth_header_value = "Token " + auth_token50 auth_basic = kwargs.get('auth_basic', False)51 if auth_basic:52 self.auth_header_value = "Basic " + base64.b64encode(token.encode()).decode()53 self.retries = kwargs.get('retries', False)54 self.profilers = kwargs.get('profilers', None)55 pass56 def _version(self, response) -> str:57 if response is not None and len(response) >= 3:58 if 'X-Influxdb-Version' in response[2]:59 return response[2]['X-Influxdb-Version']60 return "unknown"61 @classmethod62 def _from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False):63 config = configparser.ConfigParser()64 config.read(config_file)65 def config_value(key: str):66 return config['influx2'][key].strip('"')67 url = config_value('url')68 token = config_value('token')69 timeout = None70 if config.has_option('influx2', 'timeout'):71 timeout = config_value('timeout')72 org = None73 if config.has_option('influx2', 'org'):74 org = config_value('org')75 verify_ssl = True76 if config.has_option('influx2', 'verify_ssl'):77 verify_ssl = config_value('verify_ssl')78 ssl_ca_cert = None79 if config.has_option('influx2', 'ssl_ca_cert'):80 ssl_ca_cert = config_value('ssl_ca_cert')81 connection_pool_maxsize = None82 if config.has_option('influx2', 'connection_pool_maxsize'):83 connection_pool_maxsize = config_value('connection_pool_maxsize')84 auth_basic = False85 if config.has_option('influx2', 'auth_basic'):86 auth_basic = config_value('auth_basic')87 default_tags = None88 if config.has_section('tags'):89 tags = {k: v.strip('"') for k, v in config.items('tags')}90 default_tags = dict(tags)91 profilers = None92 if config.has_option('influx2', 'profilers'):93 profilers = [x.strip() for x in config_value('profilers').split(',')]94 proxy = None95 if config.has_option('influx2', 'proxy'):96 proxy = config_value('proxy')97 return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,98 enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,99 connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),100 profilers=profilers, proxy=proxy)101 @classmethod102 def _from_env_properties(cls, debug=None, enable_gzip=False):103 url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")104 token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")105 timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000")106 org = os.getenv('INFLUXDB_V2_ORG', "my-org")107 verify_ssl = os.getenv('INFLUXDB_V2_VERIFY_SSL', "True")108 ssl_ca_cert = os.getenv('INFLUXDB_V2_SSL_CA_CERT', None)109 connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None)110 auth_basic = os.getenv('INFLUXDB_V2_AUTH_BASIC', "False")111 prof = os.getenv("INFLUXDB_V2_PROFILERS", None)112 profilers = None113 if prof is not None:114 profilers = [x.strip() for x in prof.split(',')]115 default_tags = dict()116 for key, value in os.environ.items():117 if key.startswith("INFLUXDB_V2_TAG_"):118 default_tags[key[16:].lower()] = value119 return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,120 enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,121 connection_pool_maxsize=_to_int(connection_pool_maxsize), auth_basic=_to_bool(auth_basic),122 profilers=profilers)123# noinspection PyMethodMayBeStatic124class _BaseQueryApi(object):125 default_dialect = Dialect(header=True, delimiter=",", comment_prefix="#",126 annotations=["datatype", "group", "default"], date_time_format="RFC3339")127 def __init__(self, influxdb_client, query_options=None):128 from influxdb_client.client.query_api import QueryOptions129 self._query_options = QueryOptions() if query_options is None else query_options130 self._influxdb_client = influxdb_client131 self._query_api = QueryService(influxdb_client.api_client)132 """Base implementation for Queryable API."""133 def _to_tables(self, response, query_options=None, response_metadata_mode:134 FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> List[FluxTable]:135 """136 Parse HTTP response to FluxTables.137 :param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.138 """139 _parser = self._to_tables_parser(response, query_options, response_metadata_mode)140 list(_parser.generator())141 return _parser.table_list()142 async def _to_tables_async(self, response, query_options=None, response_metadata_mode:143 FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> List[FluxTable]:144 """145 Parse HTTP response to FluxTables.146 :param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.147 """148 async with self._to_tables_parser(response, query_options, response_metadata_mode) as parser:149 async for _ in parser.generator_async():150 pass151 return parser.table_list()152 def _to_csv(self, response: HTTPResponse) -> Iterator[List[str]]:153 """Parse HTTP response to CSV."""154 return csv.reader(codecs.iterdecode(response, _UTF_8_encoding))155 def _to_flux_record_stream(self, response, query_options=None,156 response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> \157 Generator[FluxRecord, Any, None]:158 """159 Parse HTTP response to FluxRecord stream.160 :param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.161 """162 _parser = self._to_flux_record_stream_parser(query_options, response, response_metadata_mode)163 return _parser.generator()164 async def _to_flux_record_stream_async(self, response, query_options=None, response_metadata_mode:165 FluxResponseMetadataMode = FluxResponseMetadataMode.full) -> \166 AsyncGenerator['FluxRecord', None]:167 """168 Parse HTTP response to FluxRecord stream.169 :param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.170 """171 _parser = self._to_flux_record_stream_parser(query_options, response, response_metadata_mode)172 return (await _parser.__aenter__()).generator_async()173 def _to_data_frame_stream(self, data_frame_index, response, query_options=None,174 response_metadata_mode: FluxResponseMetadataMode = FluxResponseMetadataMode.full):175 """176 Parse HTTP response to DataFrame stream.177 :param response: HTTP response from an HTTP client. Expected type: `urllib3.response.HTTPResponse`.178 """179 _parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)180 return _parser.generator()181 async def _to_data_frame_stream_async(self, data_frame_index, response, query_options=None, response_metadata_mode:182 FluxResponseMetadataMode = FluxResponseMetadataMode.full):183 """184 Parse HTTP response to DataFrame stream.185 :param response: HTTP response from an HTTP client. Expected type: `aiohttp.client_reqrep.ClientResponse`.186 """187 _parser = self._to_data_frame_stream_parser(data_frame_index, query_options, response, response_metadata_mode)188 return (await _parser.__aenter__()).generator_async()189 def _to_tables_parser(self, response, query_options, response_metadata_mode):190 return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.tables,191 query_options=query_options, response_metadata_mode=response_metadata_mode)192 def _to_flux_record_stream_parser(self, query_options, response, response_metadata_mode):193 return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.stream,194 query_options=query_options, response_metadata_mode=response_metadata_mode)195 def _to_data_frame_stream_parser(self, data_frame_index, query_options, response, response_metadata_mode):196 return FluxCsvParser(response=response, serialization_mode=FluxSerializationMode.dataFrame,197 data_frame_index=data_frame_index, query_options=query_options,198 response_metadata_mode=response_metadata_mode)199 def _to_data_frames(self, _generator):200 """Parse stream of DataFrames into expected type."""201 from ..extras import pd202 if isinstance(_generator, list):203 _dataFrames = _generator204 else:205 _dataFrames = list(_generator)206 if len(_dataFrames) == 0:207 return pd.DataFrame(columns=[], index=None)208 elif len(_dataFrames) == 1:209 return _dataFrames[0]210 else:211 return _dataFrames212 def _org_param(self, org):213 return get_org_query_param(org=org, client=self._influxdb_client)214 def _get_query_options(self):215 if self._query_options and self._query_options.profilers:216 return self._query_options217 elif self._influxdb_client.profilers:218 from influxdb_client.client.query_api import QueryOptions219 return QueryOptions(profilers=self._influxdb_client.profilers)220 def _create_query(self, query, dialect=default_dialect, params: dict = None):221 query_options = self._get_query_options()222 profilers = query_options.profilers if query_options is not None else None223 q = Query(query=query, dialect=dialect, extern=_BaseQueryApi._build_flux_ast(params, profilers))224 if profilers:225 print("\n===============")226 print("Profiler: query")227 print("===============")228 print(query)229 return q230 @staticmethod231 def _params_to_extern_ast(params: dict) -> List['OptionStatement']:232 statements = []233 for key, value in params.items():234 expression = _BaseQueryApi._parm_to_extern_ast(value)235 if expression is None:236 continue237 statements.append(OptionStatement("OptionStatement",238 VariableAssignment("VariableAssignment", Identifier("Identifier", key),239 expression)))240 return statements241 @staticmethod242 def _parm_to_extern_ast(value) -> Union[Expression, None]:243 if value is None:244 return None245 if isinstance(value, bool):246 return BooleanLiteral("BooleanLiteral", value)247 elif isinstance(value, int):248 return IntegerLiteral("IntegerLiteral", str(value))249 elif isinstance(value, float):250 return FloatLiteral("FloatLiteral", value)251 elif isinstance(value, datetime):252 value = get_date_helper().to_utc(value)253 return DateTimeLiteral("DateTimeLiteral", value.strftime('%Y-%m-%dT%H:%M:%S.%fZ'))254 elif isinstance(value, timedelta):255 _micro_delta = int(value / timedelta(microseconds=1))256 if _micro_delta < 0:257 return UnaryExpression("UnaryExpression", argument=DurationLiteral("DurationLiteral", [258 Duration(magnitude=-_micro_delta, unit="us")]), operator="-")259 else:260 return DurationLiteral("DurationLiteral", [Duration(magnitude=_micro_delta, unit="us")])261 elif isinstance(value, str):262 return StringLiteral("StringLiteral", str(value))263 elif isinstance(value, Iterable):264 return ArrayExpression("ArrayExpression",265 elements=list(map(lambda it: _BaseQueryApi._parm_to_extern_ast(it), value)))266 else:267 return value268 @staticmethod269 def _build_flux_ast(params: dict = None, profilers: List[str] = None):270 imports = []271 body = []272 if profilers is not None and len(profilers) > 0:273 imports.append(ImportDeclaration(274 "ImportDeclaration",275 path=StringLiteral("StringLiteral", "profiler")))276 elements = []277 for profiler in profilers:278 elements.append(StringLiteral("StringLiteral", value=profiler))279 member = MemberExpression(280 "MemberExpression",281 object=Identifier("Identifier", "profiler"),282 _property=Identifier("Identifier", "enabledProfilers"))283 prof = OptionStatement(284 "OptionStatement",285 assignment=MemberAssignment(286 "MemberAssignment",287 member=member,288 init=ArrayExpression(289 "ArrayExpression",290 elements=elements)))291 body.append(prof)292 if params is not None:293 body.extend(_BaseQueryApi._params_to_extern_ast(params))294 return File(package=None, name=None, type=None, imports=imports, body=body)295class _BaseWriteApi(object):296 def __init__(self, influxdb_client, point_settings=None):297 self._influxdb_client = influxdb_client298 self._point_settings = point_settings299 self._write_service = WriteService(influxdb_client.api_client)300 if influxdb_client.default_tags:301 for key, value in influxdb_client.default_tags.items():302 self._point_settings.add_default_tag(key, value)303 def _append_default_tag(self, key, val, record):304 from influxdb_client import Point305 if isinstance(record, bytes) or isinstance(record, str):306 pass307 elif isinstance(record, Point):308 record.tag(key, val)309 elif isinstance(record, dict):310 record.setdefault("tags", {})311 record.get("tags")[key] = val312 elif isinstance(record, Iterable):313 for item in record:314 self._append_default_tag(key, val, item)315 def _append_default_tags(self, record):316 if self._point_settings.defaultTags and record is not None:317 for key, val in self._point_settings.defaultTags.items():318 self._append_default_tag(key, val, record)319 def _serialize(self, record, write_precision, payload, **kwargs):320 from influxdb_client import Point321 if isinstance(record, bytes):322 payload[write_precision].append(record)323 elif isinstance(record, str):324 self._serialize(record.encode(_UTF_8_encoding), write_precision, payload, **kwargs)325 elif isinstance(record, Point):326 precision_from_point = kwargs.get('precision_from_point', True)327 precision = record.write_precision if precision_from_point else write_precision328 self._serialize(record.to_line_protocol(precision=precision), precision, payload, **kwargs)329 elif isinstance(record, dict):330 self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs),331 write_precision, payload, **kwargs)332 elif 'DataFrame' in type(record).__name__:333 serializer = DataframeSerializer(record, self._point_settings, write_precision, **kwargs)334 self._serialize(serializer.serialize(), write_precision, payload, **kwargs)335 elif hasattr(record, "_asdict"):336 # noinspection PyProtectedMember337 self._serialize(record._asdict(), write_precision, payload, **kwargs)338 elif _HAS_DATACLASS and dataclasses.is_dataclass(record):339 self._serialize(dataclasses.asdict(record), write_precision, payload, **kwargs)340 elif isinstance(record, Iterable):341 for item in record:342 self._serialize(item, write_precision, payload, **kwargs)343# noinspection PyMethodMayBeStatic344class _BaseDeleteApi(object):345 def __init__(self, influxdb_client):346 self._influxdb_client = influxdb_client347 self._service = DeleteService(influxdb_client.api_client)348 def _prepare_predicate_request(self, start, stop, predicate):349 date_helper = get_date_helper()350 if isinstance(start, datetime):351 start = date_helper.to_utc(start)352 if isinstance(stop, datetime):353 stop = date_helper.to_utc(stop)354 predicate_request = DeletePredicateRequest(start=start, stop=stop, predicate=predicate)355 return predicate_request356class _Configuration(Configuration):357 def __init__(self):358 Configuration.__init__(self)359 self.enable_gzip = False360 def update_request_header_params(self, path: str, params: dict):361 super().update_request_header_params(path, params)362 if self.enable_gzip:363 # GZIP Request364 if path == '/api/v2/write':365 params["Content-Encoding"] = "gzip"366 params["Accept-Encoding"] = "identity"367 pass368 # GZIP Response369 if path == '/api/v2/query':370 # params["Content-Encoding"] = "gzip"371 params["Accept-Encoding"] = "gzip"372 pass373 pass374 pass375 def update_request_body(self, path: str, body):376 _body = super().update_request_body(path, body)377 if self.enable_gzip:378 # GZIP Request379 if path == '/api/v2/write':380 import gzip381 if isinstance(_body, bytes):382 return gzip.compress(data=_body)383 else:384 return gzip.compress(bytes(_body, _UTF_8_encoding))385 return _body386def _to_bool(bool_value):387 return str(bool_value).lower() in ("yes", "true")388def _to_int(int_value):...

Full Screen

Full Screen

pyhotvect.py

Source:pyhotvect.py Github

copy

Full Screen

1import json2import logging3import os4from datetime import datetime, tzinfo, timedelta5from shutil import copyfile6from typing import Dict, List, Any7import hotvect.mlutils as mlutils8import pandas as pd9from hotvect.utils import trydelete, runshell, read_json, to_zip_archive, ensure_file_exists, clean_dir, \10 ensure_dir_exists11logging.basicConfig(level=logging.WARNING)12# Courtesy of https://stackoverflow.com/a/23705687/23490113# Under CC BY-SA 3.014class SimpleUTC(tzinfo):15 def tzname(self, **kwargs):16 return "UTC"17 def utcoffset(self, dt):18 return timedelta(0)19class Hotvect:20 def __init__(self,21 hotvect_util_jar_path: str,22 algorithm_jar_path: str,23 metadata_base_path: str,24 output_base_path: str,25 train_data_path: str,26 validation_data_path: str,27 algorithm_definition: Dict[str, Any],28 state_source_base_path: str = None,29 run_id: str = "default",30 enable_gzip: bool = True,31 jvm_options: List[str] = None32 ):33 self.algorithm_definition = algorithm_definition34 self.run_id: str = run_id35 self.ran_at: str = datetime.utcnow().replace(tzinfo=SimpleUTC()).isoformat()36 # Utilities37 self.hotvect_util_jar_path = hotvect_util_jar_path38 # Algorithm classes39 self.algorithm_jar_path = algorithm_jar_path40 # Metadata41 self.metadata_base_path = metadata_base_path42 # Output43 self.output_base_path = output_base_path44 # Train data45 self.train_data_path = train_data_path46 ensure_file_exists(train_data_path)47 # Validation data48 self.validation_data_location = validation_data_path49 ensure_file_exists(validation_data_path)50 # State source data51 self.state_source_base_path = state_source_base_path52 if self.state_source_base_path:53 ensure_dir_exists(self.state_source_base_path)54 self.feature_states: Dict[str, str] = {}55 # Gzip56 self.enable_gzip = enable_gzip57 # Jvm options58 if not jvm_options:59 self.jvm_options = ['-Xmx4g']60 else:61 self.jvm_options = jvm_options62 logging.info(f'Initialized:{self.__dict__}')63 def set_run_id(self, run_id: str):64 self.run_id = run_id65 def set_algorithm_definition(self, algorithm_definition: Dict[str, Any]):66 self.algorithm_definition = algorithm_definition67 def metadata_path(self) -> str:68 return os.path.join(self.metadata_base_path, self.algorithm_definition['algorithm_name'], self.run_id)69 def output_path(self) -> str:70 return os.path.join(self.output_base_path, self.algorithm_definition['algorithm_name'], self.run_id)71 def state_output_path(self, state_name: str):72 state_filename = f'{state_name}'73 return os.path.join(74 self.output_path(),75 state_filename76 )77 def encoded_data_file_path(self) -> str:78 encode_suffix = 'encoded.gz' if self.enable_gzip else 'encoded'79 return os.path.join(80 self.output_path(),81 encode_suffix82 )83 def model_file_path(self) -> str:84 return os.path.join(85 self.output_path(),86 'model.parameter'87 )88 def score_file_path(self) -> str:89 return os.path.join(90 self.output_path(),91 'validation_scores.csv'92 )93 def audit_data_file_path(self) -> str:94 encode_suffix = 'audit.jsonl.gz' if self.enable_gzip else 'audit.jsonl'95 return os.path.join(96 self.output_path(),97 encode_suffix98 )99 def predict_parameter_file_path(self) -> str:100 return os.path.join(101 self.output_path(),102 f"{self.algorithm_definition['algorithm_name']}@{self.run_id}.parameters.zip"103 )104 def encode_parameter_file_path(self) -> str:105 return os.path.join(106 self.output_path(),107 'encode.parameters.zip'108 )109 def _write_algorithm_definition(self) -> str:110 """Write algorithm definition so that Java can read it"""111 algorithm_definition_path = os.path.join(112 self.metadata_path(),113 'algorithm_definition.json'114 )115 trydelete(algorithm_definition_path)116 with open(algorithm_definition_path, 'w') as fp:117 json.dump(self.algorithm_definition, fp)118 return algorithm_definition_path119 def _write_data(self, data: Dict, dest_file_name: str) -> str:120 """Write algorithm definition so that Java can read it"""121 dest = os.path.join(122 self.output_path(),123 dest_file_name124 )125 trydelete(dest)126 with open(dest, 'w') as fp:127 json.dump(data, fp)128 return dest129 def clean(self) -> None:130 for file in [131 self.encoded_data_file_path(),132 self.model_file_path(),133 self.score_file_path()134 ]:135 trydelete(file)136 for directory in [137 self.metadata_path(),138 self.output_path(),139 ]:140 clean_dir(directory)141 logging.info('Cleaned output and metadata')142 def run_all(self, run_id=None, clean=True) -> Dict:143 if run_id:144 self.run_id = run_id145 result = {146 'algorithm_name': self.algorithm_definition['algorithm_name'],147 'run_id': self.run_id,148 'ran_at': self.ran_at,149 'algorithm_definition': self.algorithm_definition150 }151 self.clean()152 result['states'] = self.generate_states()153 result['package_encode_params'] = self.package_encode_parameters()154 result['encode'] = self.encode()155 result['train'] = self.train()156 result['package_predict_params'] = self.package_predict_parameters()157 result['predict'] = self.predict()158 result['evaluate'] = self.evaluate()159 if clean:160 self.clean_output()161 return result162 def _base_command(self, metadata_location: str) -> List[str]:163 ret = [164 'java',165 '-cp', f"{self.hotvect_util_jar_path}",166 ]167 ret.extend(self.jvm_options)168 ret.extend(['com.eshioji.hotvect.commandline.Main',169 '--algorithm-jar', f'{self.algorithm_jar_path}',170 '--algorithm-definition', self._write_algorithm_definition(),171 '--meta-data', metadata_location])172 return ret173 def generate_states(self) -> Dict:174 states = self.algorithm_definition.get('vectorizer_parameters', {}).get('feature_states', {})175 metadata = {}176 for state_name, instruction in states.items():177 metadata_path = os.path.join(self.metadata_path(), f'generate-state-{state_name}.json')178 trydelete(metadata_path)179 output_path = self.state_output_path(state_name)180 trydelete(output_path)181 source_path = os.path.join(self.state_source_base_path, instruction['source_name'])182 ensure_file_exists(source_path)183 generation_task = instruction['generation_task']184 if instruction.get('cache'):185 cached = os.path.join(self.state_source_base_path, instruction['cache'])186 ensure_file_exists(cached)187 logging.info(f'Using cache for state:{state_name} from {cached}')188 copyfile(cached, output_path)189 metadata[state_name] = {190 'cache': cached191 }192 else:193 logging.info(f'No cache found for state:{state_name}, generating')194 cmd = self._base_command(metadata_path)195 cmd.extend(['--generate-state', generation_task,196 '--training-data', self.train_data_path,197 '--source', source_path,198 '--dest', output_path])199 runshell(cmd)200 metadata[state_name] = read_json(metadata_path)201 self.feature_states[state_name] = output_path202 return metadata203 def package_encode_parameters(self) -> Dict:204 encode_parameter_package_location = self.encode_parameter_file_path()205 trydelete(encode_parameter_package_location)206 to_package = list(self.feature_states.values())207 to_zip_archive(to_package, encode_parameter_package_location)208 return {209 'sources': to_package,210 'package': encode_parameter_package_location211 }212 def encode(self) -> Dict:213 metadata_location = os.path.join(self.metadata_path(), 'encode_metadata.json')214 trydelete(metadata_location)215 encoded_data_location = self.encoded_data_file_path()216 trydelete(encoded_data_location)217 cmd = self._base_command(metadata_location)218 cmd.append('--encode')219 if self.feature_states:220 # We have feature states221 cmd.extend(['--parameters', self.encode_parameter_file_path()])222 cmd.extend(['--source', self.train_data_path])223 cmd.extend(['--dest', encoded_data_location])224 runshell(cmd)225 return read_json(metadata_location)226 def train(self) -> Dict:227 metadata_location = os.path.join(self.metadata_path(), 'train_metadata.json')228 trydelete(metadata_location)229 model_location = self.model_file_path()230 trydelete(model_location)231 cmd = [232 'vw', self.encoded_data_file_path(),233 '--readable_model', model_location,234 '--noconstant',235 '--loss_function', 'logistic',236 ]237 train_params = self.algorithm_definition['training_parameters']238 cmd.extend(train_params)239 train_log = runshell(cmd)240 metadata = {241 'training_parameters': train_params,242 'train_log': train_log243 }244 with open(metadata_location, 'w') as f:245 json.dump(metadata, f)246 return metadata247 def package_predict_parameters(self) -> Dict:248 predict_parameter_package_location = self.predict_parameter_file_path()249 trydelete(predict_parameter_package_location)250 # Add the model file251 to_package = [self.model_file_path()]252 # Add all the feature states253 to_package.extend(list(self.feature_states.values()))254 # Add the algo parameters255 algorithm_parameters = {256 'algorithm_name': self.algorithm_definition['algorithm_name'],257 'parameter_id': self.run_id,258 'ran_at': self.ran_at,259 'algorithm_definition': self.algorithm_definition,260 'sources': to_package,261 'package': predict_parameter_package_location262 }263 algo_parameters_path = self._write_data(algorithm_parameters, 'algorithm_parameters.json')264 to_package.append(algo_parameters_path)265 to_zip_archive(to_package, predict_parameter_package_location)266 return algorithm_parameters267 def predict(self) -> Dict:268 metadata_location = os.path.join(self.metadata_path(), 'predict_metadata.json')269 trydelete(metadata_location)270 score_location = self.score_file_path()271 trydelete(score_location)272 cmd = self._base_command(metadata_location)273 cmd.append('--predict')274 cmd.extend(['--source', self.validation_data_location])275 cmd.extend(['--dest', score_location])276 cmd.extend(['--parameters', self.predict_parameter_file_path()])277 runshell(cmd)278 return read_json(metadata_location)279 def evaluate(self) -> Dict:280 metadata_location = os.path.join(self.metadata_path(), 'evaluate_metadata.json')281 trydelete(metadata_location)282 df = pd.read_csv(self.score_file_path(), header=None)283 lower_auc, mean_auc, upper_auc = mlutils.bootstrap_roc_auc(df[1], df[0])284 meta_data = {285 'upper_auc': upper_auc,286 'mean_auc': mean_auc,287 'lower_auc': lower_auc288 }289 with open(metadata_location, 'w') as f:290 json.dump(meta_data, f)291 return meta_data292 def clean_output(self) -> None:293 trydelete(self.encoded_data_file_path())294 trydelete(self.model_file_path())295 trydelete(self.score_file_path())296 trydelete(self.output_path())297 def audit(self) -> None:298 metadata_location = os.path.join(self.metadata_path(), 'audit_metadata.json')299 trydelete(metadata_location)300 audit_data_location = self.audit_data_file_path()301 trydelete(audit_data_location)302 cmd = self._base_command(metadata_location)303 cmd.append('--audit')304 if self.feature_states:305 # We have feature states306 cmd.extend(['--parameters', self.encode_parameter_file_path()])307 cmd.extend(['--source', self.train_data_path])308 cmd.extend(['--dest', audit_data_location])309 runshell(cmd)...

Full Screen

Full Screen

influxdb_client.py

Source:influxdb_client.py Github

copy

Full Screen

1"""InfluxDBClient is client for API defined in https://github.com/influxdata/influxdb/blob/master/http/swagger.yml."""2from __future__ import absolute_import3import configparser4import os5from influxdb_client import Configuration, ApiClient, HealthCheck, HealthService, Ready, ReadyService6from influxdb_client.client.authorizations_api import AuthorizationsApi7from influxdb_client.client.bucket_api import BucketsApi8from influxdb_client.client.delete_api import DeleteApi9from influxdb_client.client.labels_api import LabelsApi10from influxdb_client.client.organizations_api import OrganizationsApi11from influxdb_client.client.query_api import QueryApi12from influxdb_client.client.tasks_api import TasksApi13from influxdb_client.client.users_api import UsersApi14from influxdb_client.client.write_api import WriteApi, WriteOptions, PointSettings15class InfluxDBClient(object):16 """InfluxDBClient is client for InfluxDB v2."""17 def __init__(self, url, token, debug=None, timeout=10_000, enable_gzip=False, org: str = None,18 default_tags: dict = None, **kwargs) -> None:19 """20 Initialize defaults.21 :param url: InfluxDB server API url (ex. http://localhost:8086).22 :param token: auth token23 :param debug: enable verbose logging of http requests24 :param timeout: HTTP client timeout setting for a request specified in milliseconds.25 If one number provided, it will be total request timeout.26 It can also be a pair (tuple) of (connection, read) timeouts.27 :param enable_gzip: Enable Gzip compression for http requests. Currently only the "Write" and "Query" endpoints28 supports the Gzip compression.29 :param org: organization name (used as a default in query and write API)30 :key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.31 :key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.32 :key str proxy: Set this to configure the http proxy to be used (ex. http://localhost:3128)33 :key int connection_pool_maxsize: Number of connections to save that can be reused by urllib3.34 Defaults to "multiprocessing.cpu_count() * 5".35 :key urllib3.util.retry.Retry retries: Set the default retry strategy that is used for all HTTP requests36 except batching writes. As a default there is no one retry strategy.37 """38 self.url = url39 self.token = token40 self.org = org41 self.default_tags = default_tags42 conf = _Configuration()43 if self.url.endswith("/"):44 conf.host = self.url[:-1]45 else:46 conf.host = self.url47 conf.enable_gzip = enable_gzip48 conf.debug = debug49 conf.verify_ssl = kwargs.get('verify_ssl', True)50 conf.ssl_ca_cert = kwargs.get('ssl_ca_cert', None)51 conf.proxy = kwargs.get('proxy', None)52 conf.connection_pool_maxsize = kwargs.get('connection_pool_maxsize', conf.connection_pool_maxsize)53 conf.timeout = timeout54 auth_token = self.token55 auth_header_name = "Authorization"56 auth_header_value = "Token " + auth_token57 retries = kwargs.get('retries', False)58 self.api_client = ApiClient(configuration=conf, header_name=auth_header_name,59 header_value=auth_header_value, retries=retries)60 def __enter__(self):61 """62 Enter the runtime context related to this object.63 It will bind this method’s return value to the target(s)64 specified in the `as` clause of the statement.65 return: self instance66 """67 return self68 def __exit__(self, exc_type, exc_value, traceback):69 """Exit the runtime context related to this object and close the client."""70 self.close()71 @classmethod72 def from_config_file(cls, config_file: str = "config.ini", debug=None, enable_gzip=False):73 """74 Configure client via configuration file. The configuration has to be under 'influx' section.75 The supported formats:76 - https://docs.python.org/3/library/configparser.html77 - https://toml.io/en/78 Configuration options:79 - url80 - org81 - token82 - timeout,83 - verify_ssl84 - ssl_ca_cert85 - connection_pool_maxsize86 config.ini example::87 [influx2]88 url=http://localhost:808689 org=my-org90 token=my-token91 timeout=600092 connection_pool_maxsize=2593 [tags]94 id = 132-987-65595 customer = California Miner96 data_center = ${env.data_center}97 config.toml example::98 [influx2]99 url = "http://localhost:8086"100 token = "my-token"101 org = "my-org"102 timeout = 6000103 connection_pool_maxsize = 25104 [tags]105 id = "132-987-655"106 customer = "California Miner"107 data_center = "${env.data_center}"108 """109 config = configparser.ConfigParser()110 config.read(config_file)111 def config_value(key: str):112 return config['influx2'][key].strip('"')113 url = config_value('url')114 token = config_value('token')115 timeout = None116 if config.has_option('influx2', 'timeout'):117 timeout = config_value('timeout')118 org = None119 if config.has_option('influx2', 'org'):120 org = config_value('org')121 verify_ssl = True122 if config.has_option('influx2', 'verify_ssl'):123 verify_ssl = config_value('verify_ssl')124 ssl_ca_cert = None125 if config.has_option('influx2', 'ssl_ca_cert'):126 ssl_ca_cert = config_value('ssl_ca_cert')127 connection_pool_maxsize = None128 if config.has_option('influx2', 'connection_pool_maxsize'):129 connection_pool_maxsize = config_value('connection_pool_maxsize')130 default_tags = None131 if config.has_section('tags'):132 tags = {k: v.strip('"') for k, v in config.items('tags')}133 default_tags = dict(tags)134 return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,135 enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,136 connection_pool_maxsize=_to_int(connection_pool_maxsize))137 @classmethod138 def from_env_properties(cls, debug=None, enable_gzip=False):139 """140 Configure client via environment properties.141 Supported environment properties:142 - INFLUXDB_V2_URL143 - INFLUXDB_V2_ORG144 - INFLUXDB_V2_TOKEN145 - INFLUXDB_V2_TIMEOUT146 - INFLUXDB_V2_VERIFY_SSL147 - INFLUXDB_V2_SSL_CA_CERT148 - INFLUXDB_V2_CONNECTION_POOL_MAXSIZE149 """150 url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")151 token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")152 timeout = os.getenv('INFLUXDB_V2_TIMEOUT', "10000")153 org = os.getenv('INFLUXDB_V2_ORG', "my-org")154 verify_ssl = os.getenv('INFLUXDB_V2_VERIFY_SSL', "True")155 ssl_ca_cert = os.getenv('INFLUXDB_V2_SSL_CA_CERT', None)156 connection_pool_maxsize = os.getenv('INFLUXDB_V2_CONNECTION_POOL_MAXSIZE', None)157 default_tags = dict()158 for key, value in os.environ.items():159 if key.startswith("INFLUXDB_V2_TAG_"):160 default_tags[key[16:].lower()] = value161 return cls(url, token, debug=debug, timeout=_to_int(timeout), org=org, default_tags=default_tags,162 enable_gzip=enable_gzip, verify_ssl=_to_bool(verify_ssl), ssl_ca_cert=ssl_ca_cert,163 connection_pool_maxsize=_to_int(connection_pool_maxsize))164 def write_api(self, write_options=WriteOptions(), point_settings=PointSettings()) -> WriteApi:165 """166 Create a Write API instance.167 :param point_settings:168 :param write_options: write api configuration169 :return: write api instance170 """171 return WriteApi(influxdb_client=self, write_options=write_options, point_settings=point_settings)172 def query_api(self) -> QueryApi:173 """174 Create a Query API instance.175 :return: Query api instance176 """177 return QueryApi(self)178 def close(self):179 """Shutdown the client."""180 self.__del__()181 def __del__(self):182 """Shutdown the client."""183 if self.api_client:184 self.api_client.__del__()185 self.api_client = None186 def buckets_api(self) -> BucketsApi:187 """188 Create the Bucket API instance.189 :return: buckets api190 """191 return BucketsApi(self)192 def authorizations_api(self) -> AuthorizationsApi:193 """194 Create the Authorizations API instance.195 :return: authorizations api196 """197 return AuthorizationsApi(self)198 def users_api(self) -> UsersApi:199 """200 Create the Users API instance.201 :return: users api202 """203 return UsersApi(self)204 def organizations_api(self) -> OrganizationsApi:205 """206 Create the Organizations API instance.207 :return: organizations api208 """209 return OrganizationsApi(self)210 def tasks_api(self) -> TasksApi:211 """212 Create the Tasks API instance.213 :return: tasks api214 """215 return TasksApi(self)216 def labels_api(self) -> LabelsApi:217 """218 Create the Labels API instance.219 :return: labels api220 """221 return LabelsApi(self)222 def health(self) -> HealthCheck:223 """224 Get the health of an instance.225 :return: HealthCheck226 """227 health_service = HealthService(self.api_client)228 try:229 health = health_service.get_health()230 return health231 except Exception as e:232 return HealthCheck(name="influxdb", message=str(e), status="fail")233 def ready(self) -> Ready:234 """235 Get The readiness of the InfluxDB 2.0.236 :return: Ready237 """238 ready_service = ReadyService(self.api_client)239 return ready_service.get_ready()240 def delete_api(self) -> DeleteApi:241 """242 Get the delete metrics API instance.243 :return: delete api244 """245 return DeleteApi(self)246class _Configuration(Configuration):247 def __init__(self):248 Configuration.__init__(self)249 self.enable_gzip = False250 def update_request_header_params(self, path: str, params: dict):251 super().update_request_header_params(path, params)252 if self.enable_gzip:253 # GZIP Request254 if path == '/api/v2/write':255 params["Content-Encoding"] = "gzip"256 params["Accept-Encoding"] = "identity"257 pass258 # GZIP Response259 if path == '/api/v2/query':260 # params["Content-Encoding"] = "gzip"261 params["Accept-Encoding"] = "gzip"262 pass263 pass264 pass265 def update_request_body(self, path: str, body):266 _body = super().update_request_body(path, body)267 if self.enable_gzip:268 # GZIP Request269 if path == '/api/v2/write':270 import gzip271 if isinstance(_body, bytes):272 return gzip.compress(data=_body)273 else:274 return gzip.compress(bytes(_body, "utf-8"))275 return _body276def _to_bool(bool_value):277 return str(bool_value).lower() in ("yes", "true")278def _to_int(int_value):...

Full Screen

Full Screen

configure_caddy.py

Source:configure_caddy.py Github

copy

Full Screen

1#!/usr/bin/python2"""3Configure caddy service4"""5import os6import sys7import json8import bcrypt9import logging10import coloredlogs11import argparse12from urllib.parse import quote, urljoin13from subprocess import run, call14import functions as func15### Enable logging16logging.basicConfig(17 format='%(asctime)s [%(levelname)s] %(message)s', 18 level=logging.INFO, 19 stream=sys.stdout)20log = logging.getLogger(__name__)21### Enable argument parsing22parser = argparse.ArgumentParser()23parser.add_argument('--opts', type=json.loads, help='Set script arguments')24parser.add_argument('--env', type=json.loads, help='Set script environment')25parser.add_argument('--user', type=json.loads, help='Load user settings')26parser.add_argument('--settings', type=json.loads, help='Load script settings')27args, unknown = parser.parse_known_args()28if unknown:29 log.error("Unknown arguments " + str(unknown))30### Load arguments31cli_opts = args.opts32cli_env = args.env33cli_user = args.user34cli_settings = args.settings35### Set log level36verbosity = cli_opts.get("verbosity")37log.setLevel(verbosity)38# Setup colored console logs39coloredlogs.install(fmt='%(asctime)s [%(levelname)s] %(message)s', level=verbosity, logger=log)40### Get envs41proxy_base_url = cli_env.get("PROXY_BASE_URL")42caddy_virtual_port = cli_env.get("CADDY_VIRTUAL_PORT")43caddy_virtual_host = cli_env.get("CADDY_VIRTUAL_HOST")44caddy_virtual_proto = cli_env.get("CADDY_VIRTUAL_PROTO")45caddy_virtual_base_url = cli_env.get("CADDY_VIRTUAL_BASE_URL")46caddy_proxy_encodings_gzip = cli_env.get("CADDY_PROXY_ENCODINGS_GZIP")47caddy_proxy_encodings_zstd = cli_env.get("CADDY_PROXY_ENCODINGS_ZSTD")48caddy_proxy_templates = cli_env.get("CADDY_PROXY_TEMPLATES")49caddy_letsencrypt_email = cli_env.get("CADDY_LETSENCRYPT_EMAIL")50caddy_letsencrypt_endpoint = cli_env.get("CADDY_LETSENCRYPT_ENDPOINT")51caddy_http_port = cli_env.get("CADDY_HTTP_PORT")52caddy_https_port = cli_env.get("CADDY_HTTPS_PORT")53caddy_auto_https = cli_env.get("CADDY_AUTO_HTTPS")54fb_port = cli_user.get("filebrowser").get("port")55fb_base_url = cli_user.get("filebrowser").get("base_url")56vscode_bind_addr = cli_user.get("vscode").get("bind_addr")57vscode_base_url = cli_user.get("vscode").get("base_url")58app_bind_addr = cli_user.get("app").get("bind_addr")59app_base_url = cli_user.get("app").get("base_url")60### Get user settings61user_name = cli_user.get("name")62user_group = cli_user.get("group")63user_home = cli_user.get("dirs").get("home").get("path")64### Clean up envs65application = "caddy"66proxy_base_url = func.clean_url(proxy_base_url)67host_fqdn = caddy_virtual_host # @TODO: Not reading from env68host_port = caddy_virtual_port69host_ip = "0.0.0.0"70host_proto = caddy_virtual_proto71host_base_url = func.clean_url(caddy_virtual_base_url)72auto_https = True if caddy_auto_https == "true" else False73enable_gzip = True if caddy_proxy_encodings_gzip == "true" else False74enable_zstd = True if caddy_proxy_encodings_zstd == "true" else False75enable_templates = True if caddy_proxy_templates == "true" else False76### Set config and data paths77config_dir = os.path.join(user_home, ".config", application)78if not os.path.exists(config_dir):79 os.makedirs(config_dir)80storage = os.path.join(config_dir, "storage")81if not os.path.exists(storage): 82 os.mkdir(storage)83### Set certificate endpoint84letsencrypt_staging = "https://acme-staging-v02.api.letsencrypt.org/directory"85letsencrypt_production = "https://acme-v02.api.letsencrypt.org/directory"86if caddy_letsencrypt_endpoint == "dev":87 endpoint = letsencrypt_staging88elif caddy_letsencrypt_endpoint == "prod":89 endpoint = letsencrypt_production90elif caddy_letsencrypt_endpoint == "internal":91 #@TODO: Get internal certs working92 endpoint = letsencrypt_production = "set this up"93else:94 log.info(f"invalid letsencrypt endpoint: '{caddy_letsencrypt_endpoint}'")95### Run protocol check96if not host_proto in ['http', 'https']:97 log.critical(f"{application}: protocol '{proto}' is not valid! Exiting.")98 sys.exit()99### Define application route settings100servers = dict()101servers["automatic_https"]: auto_https102servers['default'] = dict()103domains = dict()104domains[host_fqdn] = ""105vscode_settings = {106 "name": "vscode",107 "host": "localhost",108 "port": vscode_bind_addr.split(":",1)[1],109 "proto": "http",110 "base_url": func.clean_url(vscode_base_url),111 "enable_gzip": True,112 "enable_gzip": True,113 "enable_templates": True,114}115filebrowser_settings = {116 "name": "filebrowser",117 "host": "localhost",118 "port": fb_port,119 "proto": "http",120 "base_url": func.clean_url(fb_base_url),121 "enable_gzip": True,122 "enable_gzip": True,123 "enable_templates": True,124}125app_settings = {126 "name": "app",127 "host": "localhost",128 "port": app_bind_addr.split(":",1)[1],129 "proto": "http",130 "base_url": func.clean_url(app_base_url),131 "enable_gzip": True,132 "enable_gzip": True,133 "enable_templates": True,134}135### Create application sub-config templates136service_settings = [vscode_settings, filebrowser_settings, app_settings]137subroutes = list()138for service in service_settings:139 service_base_url = urljoin(host_base_url, service.get("base_url"))140 full_base_url = urljoin(proxy_base_url, service_base_url) if service_base_url != "/" else ""141 log.info("{name} base url: '{url}'".format(name=service.get("name"), url=full_base_url))142 encodings = dict()143 if service.get("enable_gzip") or service.get("enable_zstd"):144 encodings = {145 "handle": [{146 "encodings": {},147 "handler": "encode"148 }]149 }150 if service.get("enable_gzip"):151 encodings["handle"][0]["encodings"]['gzip'] = dict()152 if service.get("enable_zstd"):153 encodings["handle"][0]["encodings"]['zstd']= dict()154 templates = dict()155 if service.get("enable_templates"):156 templates = {157 "handle": [{158 "handler": "templates"159 }]160 }161 subroute = {162 "handler": "subroute",163 "routes": [{164 "handle": [{165 "handler": "static_response",166 "headers": {167 "Location": [168 f"{full_base_url}/"169 ]170 },171 "status_code": 302172 }173 ],174 "match": [{175 "path": [176 f"{full_base_url}"177 ]178 }179 ]180 },181 {182 "handle": [{183 "handler": "subroute",184 "routes": [{185 "handle": [{186 "handler": "rewrite",187 "strip_path_prefix": f"{full_base_url}"188 }]189 },190 {191 "handle": [{192 "handler": "reverse_proxy",193 "upstreams": [{194 "dial": "{}:{}".format(service.get("host"), service.get("port"))195 }]196 }]197 },198 encodings,199 templates200 ]201 }],202 "match": [{203 "path": [204 f"{full_base_url}/*"205 ]206 }]207 }]208 }209 subroutes.append(subroute)210if host_fqdn != None:211 if host_fqdn == "":212 match = []213 else:214 match = [{ 215 "host": [host_fqdn]216 }]217 route = {218 "match": match,219 "handle": subroutes,220 "terminal": True221 }222if servers['default'].get('routes') == None:223 servers['default']['listen'] = [f"{host_ip}:{host_port}"]224 servers['default']['routes'] = [route]225 servers['default']['logs'] = {226 "logger_names": {227 host_fqdn: "common",228 }229 }230else:231 servers['default']['routes'].append(route)232### Create config template233config_file = {234 "admin": {235 "disabled": False,236 "listen": '',237 "enforce_origin": False,238 "origins": [''],239 "config": {240 "persist": False241 }242 },243 "logging": {244 "logs": {245 "default": {246 "exclude": [247 "http.log.access.json",248 "http.log.access.common",249 "http.log.access.common_and_json"250 ]251 },252 "common": {253 "writer": {254 "output": "stdout"255 },256 "encoder": {257 "format": "single_field",258 "field": "common_log"259 },260 "level": "",261 "sampling": {262 "interval": 0,263 "first": 0,264 "thereafter": 0265 },266 "include": ["http.log.access.common"],267 }268 }269 },270 "storage": {271 "module": "file_system",272 "root": storage273 },274 "apps": {275 "http": {276 "http_port": int(caddy_http_port),277 "https_port": int(caddy_https_port),278 "servers": servers279 },280 "tls": {281 "automation": {282 "policies": [{283 "subjects": list(domains.keys()),284 "issuers": [285 {286 "module": "acme",287 "ca": endpoint,288 "email": caddy_letsencrypt_email289 },290 {291 "module": "internal",292 "ca": "",293 "lifetime": 0,294 "sign_with_root": False295 }296 ],297 "key_type": ""298 }]299 }300 }301 }302}303### Write config file304config_path = os.path.join(config_dir, "settings.json")305config_json = json.dumps(config_file, indent = 4)306with open(config_path, "w") as f: 307 f.write(config_json)308# fix permissions309log.info(f"setting permissions on '{config_dir}' to '{user_name}:{user_group}'")310func.recursive_chown(config_dir, user_name, user_group)311### Display final config312log.debug(f"{application} config: '{config_path}'")...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful