Best Python code snippet using lisa_python
features2strmatrix.py
Source:features2strmatrix.py
1# import itertools as it2import os.path3import re4# import nltk.corpus as corpus5import numpy as np6import pandas as pd7import sklearn.linear_model as ln8# from nltk import PorterStemmer9from pandas import DataFrame10from sklearn import cross_validation11from src.algos.utils import RMSE_NORMALIZED12from src.features.sets import blank13from src.features.sets import boolean_columns14# from src.features.sets import get_syn15# from src.features.sets import stop_words16from src.features.extract_count_features import process_str17from src.features.extract_count_features import load_features118def to_set(x):19 if type(x) is set: return x20 return blank21def to_str(x):22 if type(x) is str: return x23 if type(x) is unicode: return x24 return u''25def column_transformer(x, combine=True):26 if x in boolean_columns: return 'combined_boolean'27 c = str(x).lower()28 c = c.decode('utf-8', 'ignore')29 if combine:30 if re.search(31 '(:?width|height|depth|length|size|thickness|capacity|diameter|\(in\.\)|\(ft\.\)|\(mm\))',32 c) is not None:33 return 'combined_size'34 if re.search('watt', c) is not None:35 return 'combined_watt'36 if re.search('volt', c) is not None:37 return 'combined_volt'38 if re.search('(:?weight|\(lb\.\))', c) is not None:39 return 'combined_weight'40 if re.search('brand', c) is not None:41 return 'combined_brand'42 if re.search('(:?color|rgb value)', c) is not None:43 return 'combined_color'44 if re.search('material', c) is not None:45 return 'combined_material'46 if re.search('temperature', c) is not None:47 return 'combined_temperature'48 if re.search('type', c) is not None:49 return 'combined_type'50 if re.search('(:?time|\(hours\)|\(min\.\))', c) is not None:51 return 'combined_time'52 if re.search('(:?number of|count|# of)', c) is not None:53 return 'combined_count'54 if re.search('(?:degree|angle|\(deg\))', c) is not None:55 return 'combined_angle'56 if re.search('(:?\(sq\.? ft\.?\)|square foot|\(sq\. in\.\))', c) is not None:57 return 'combined_area'58 if re.search('(?:\(\w?hz\)|frequency|\(rpm\)|\(gpm\)|\(cfm\)|\(mph\)|speed|/hour|/min|per min)', c) is not None:59 return 'combined_speed'60 if re.search('(?:\(db\)|noise)', c) is not None:61 return 'combined_noise'62 if re.search('\([^\(\)]+\)$', c) is not None:63 return 'combined_measurements'64 if re.search('/', c) is not None:65 return 'combined_type'66 c = re.sub('bullet\d+', 'bullet', c)67 return c68def value_transformer(c, v, skip_stop_words=True, enable_stemming=True):69 if c in boolean_columns:70 v = str(v).lower()71 if v.startswith('y'):72 v = c73 else:74 return ''75 av = process_str(s=v, stemming=enable_stemming, skip_stop_words=skip_stop_words)76 return av77def product2attrs(product_to_trace=None, combine=True, skip_stop_words=True, enable_stemming=True):78 if product_to_trace is None: product_to_trace = {}79 file_name = './dataset/product_to_attributes.'80 if combine: file_name += 'combined.'81 if skip_stop_words: file_name += 'stop.'82 if enable_stemming: file_name += 'stemming.'83 file_name += 'csv'84 if os.path.isfile(file_name):85 print 'load data from file'86 rs = pd.read_csv(file_name, encoding='utf-8', index_col=0)87 print 'loaded', file_name, '->', rs.shape88 return rs89 attrs = pd.read_csv('./dataset/attributes.csv')90 attrs = attrs[attrs['product_uid'] == attrs['product_uid']]91 descrs = pd.read_csv('./dataset/product_descriptions.csv')92 descrs = descrs[descrs['product_uid'] == descrs['product_uid']]93 print 'attributes:', attrs.shape94 colls = attrs['name'].apply(lambda c: column_transformer(c, combine)).unique()95 print 'attributes columns:', len(colls)96 # noinspection PyUnresolvedReferences97 product_ids = [int(x) for x in pd.concat([attrs['product_uid'], descrs['product_uid']]).unique()]98 print 'unique ids:', len(product_ids)99 rs = DataFrame(index=product_ids, columns=np.hstack(colls))100 rs.index.names = ['product_uid']101 print 'process attrs'102 for index, row in attrs.iterrows():103 if index % 100000 == 0: print index,104 id = int(row['product_uid'])105 cc = column_transformer(row['name'], combine)106 is_trace_enabled = id in product_to_trace107 if is_trace_enabled:108 print109 print row['name'], id, '->', row['value']110 cv = value_transformer(row['name'], row['value'], skip_stop_words, enable_stemming)111 current = rs.at[id, cc]112 rs.at[id, cc] = (to_str(current) + ' ' + cv).strip()113 if is_trace_enabled: print cc, id, '->', rs.at[id, cc]114 print115 print 'descriptions :', descrs.shape116 for index, row in descrs.iterrows():117 if index % 10000 == 0: print index,118 id = int(row['product_uid'])119 if id not in rs.index: continue120 is_trace_enabled = id in product_to_trace121 if is_trace_enabled:122 print123 print 'product_description', id, '->', row['product_description']124 current = rs.at[id, 'bullet']125 rs.at[id, 'bullet'] = (to_str(current) + ' ' +126 value_transformer('bullet', row['product_description'],127 skip_stop_words, enable_stemming)).strip()128 if is_trace_enabled: print 'bullet', id, '->', rs.at[id, 'bullet']129 print130 print 'store data into file'131 rs.to_csv(file_name, encoding='utf-8')132 print 'result:', rs.shape, '->', file_name133 return rs134def count_words(data, search):135 if type(data) is not unicode: return 0136 return len(set(data.split(' ')) & search)137def count_words_vectorized(s):138 if type(s[0]) is not unicode: return 0139 if type(s[1]) is not unicode: return 0140 return len(set(s[0].split(' ')) & set(s[1].split(' ')))141def search_in_title(s):142 if type(s[0]) is not unicode: return 0143 if type(s[1]) is not unicode: return 0144 return float(s[0] in s[1])145def search_in_bullet(s, p_to_a):146 if type(s[1]) is not unicode: return 0147 return float(s[1] in p_to_a.loc[int(s[0]), 'bullet'])148def safe_len(s):149 if type(s) is unicode: return len(s.split(' '))150 return 0151def prepare_word_set(data_file='train', product_to_trace=None, skip_stop_words=True, enable_stemming=True):152 if product_to_trace is None: product_to_trace = set([])153 file_name = './dataset/word_set.' + data_file + '.'154 if skip_stop_words:155 file_name += 'stop.'156 if enable_stemming:157 file_name += 'stemming.'158 file_name += 'csv'159 if os.path.isfile(file_name):160 print 'load', data_file, 'data from file'161 data = pd.read_csv(file_name, encoding='utf-8')162 print 'loaded', data.shape, '->', file_name163 return data164 data = pd.read_csv('./dataset/' + data_file + '.csv')165 columns = ['id', 'product_uid', 'product_title', 'search_set']166 if 'relevance' in data.columns: columns.append('relevance')167 x = DataFrame(columns=columns)168 x['id'] = data['id']169 x['product_uid'] = data['product_uid']170 if 'relevance' in data.columns:171 x['relevance'] = data['relevance']172 for index, row in data.iterrows():173 if index % 10000 == 0: print index,174 pid = int(row['product_uid'])175 oid = int(row['id'])176 is_trace_enabled = pid in product_to_trace177 if is_trace_enabled:178 print179 print 'search term', pid, '(', oid, ')', '[', row['search_term'], ']'180 x.at[index, 'search_set'] = value_transformer('search_set', row['search_term'],181 skip_stop_words, enable_stemming)182 if is_trace_enabled: print 'search set', pid, '(', oid, ')', x.at[index, 'search_set']183 if is_trace_enabled: print 'product title', pid, '(', oid, ')', '[', row['product_title'], ']'184 x.at[index, 'product_title'] = value_transformer('product_title', row['product_title'],185 skip_stop_words, enable_stemming)186 if is_trace_enabled: print 'product title', pid, '(', oid, ')', '[', x.at[index, 'product_title'], ']'187 print188 print 'store word set'189 x.to_csv(file_name, encoding='utf-8', index=None)190 print 'stored', x.shape, '->', file_name191 return x192def ration(s):193 if s[1] == 0: return 0194 return s[0] / s[1]195def add_ration(features):196 print 'process ration:', 'product_title',197 features['product_title/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,198 reduce=True, raw=True)199 print 'search_in_title',200 features['search_in_title/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,201 reduce=True, raw=True)202 print 'search_in_bullet',203 features['search_in_bullet/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,204 reduce=True, raw=True)205 print 'bullet'206 features['bullet/ratio/'] = features[['product_title', 'search_len']].apply(ration, axis=1,207 reduce=True, raw=True)208 print 'ratio finish'209def add_indicators(features):210 indicators = ['combined_size', 'combined_material', 'combined_brand', 'combined_color', 'combined_weight',211 'combined_watt', 'combined_volt', 'combined_type']212 print 'process indicators:',213 for ind in indicators:214 print ind,215 features[ind + '/indicator/'] = features[ind].apply(lambda x: float(x > 0))216def integrate_with_anton(features):217 print 'integrate with anton'218 anton_train = pd.read_csv('./dataset/good_ft_2_train.csv', index_col='id')219 anton_test = pd.read_csv('./dataset/good_ft_2_test.csv', index_col='id')220 if 'relevance' in features.columns:221 anton = anton_train222 features = features[features.columns[features.columns != 'relevance']]223 else:224 anton = anton_test225 features = pd.merge(anton, features, right_on='id', left_index=True)226 print 'finish integrating'227 return features228def match_features(p_to_a=None, data_file='train'):229 file_name = './dataset/raw_features.' + data_file + '.csv'230 if os.path.isfile(file_name):231 print 'load', data_file, 'data from file'232 features = pd.read_csv(file_name)233 print 'loaded', features.shape, '->', file_name234 return features235 if p_to_a is None: p_to_a = product2attrs()236 data = prepare_word_set(data_file)237 print 'build start data frame'238 attrs = p_to_a.columns239 features = DataFrame(columns=attrs)240 features['id'] = data['id']241 if 'relevance' in data.columns: features['relevance'] = data['relevance']242 print 'calculate search_len'243 features['search_len'] = data['search_set'].apply(safe_len)244 print 'calculate product_title'245 features['product_title'] = data[['product_title', 'search_set']].apply(count_words_vectorized, axis=1,246 reduce=True, raw=True)247 print 'calculate search_in_title'248 features['search_in_title'] = data[['search_set', 'product_title']].apply(search_in_title, axis=1,249 reduce=True, raw=True)250 print 'calculate search_in_bullet'251 features['search_in_bullet'] = data[['product_uid', 'search_set']].apply(lambda s: search_in_bullet(s, p_to_a)252 , axis=1, reduce=True, raw=True)253 print 'process attributes'254 features = features.fillna(0.0)255 tmp = np.zeros((data.shape[0], len(attrs)))256 for index, row in data.iterrows():257 if index % 10000 == 0: print index,258 pid = row['product_uid']259 search_set = row['search_set']260 if type(search_set) is not unicode: continue261 search_set = set(search_set.split(' '))262 values = p_to_a.loc[pid]263 tmp[index] = values.apply(lambda d: count_words(d, search_set))264 print265 print 'integrate features with attributes'266 features[attrs] = tmp267 add_ration(features)268 add_indicators(features)269 features = integrate_with_anton(features)270 print 'store features'271 features.to_csv(file_name, index=None)272 print 'stored', features.shape, '->', file_name273 return features274def features_to_x(features):275 columns = features.columns276 columns = columns[np.all([columns[:] != 'relevance',277 columns[:] != 'id'], axis=0)]278 x = features[columns]279 return x280def zero_normalization(features, merge=True):281 file_name = './dataset/zero_normalization.csv'282 if os.path.isfile(file_name):283 print 'load', file_name, 'data from file'284 indexes = pd.Series.from_csv(file_name)285 if 'relevance' not in features.columns:286 indexes = indexes[indexes.index != 'relevance']287 print 'loaded', indexes.shape, '->', file_name288 else:289 if 'relevance' not in features.columns: raise Exception('process train features before test')290 indexes = features.apply(np.sum, axis=0) > 0291 print 'store indexes'292 indexes.to_csv(file_name)293 print 'stored', indexes.shape, '->', file_name294 if merge:295 features = features.copy(deep=True)296 features['bullet'] += features[features.columns[indexes == False]].apply(np.sum, axis=1)297 features = features[features.columns[indexes]]298 print 'zero normalized', features.shape299 return features300def fill_start_mask(mask, start_mask):301 if start_mask is None: return mask302 if type(start_mask) is str:303 file_name = './dataset/mask.' + start_mask + '.csv'304 if not os.path.isfile(file_name): raise Exception('can not find start mask')305 print 'load', file_name, 'data from file'306 start_mask = pd.Series.from_csv(file_name)307 print 'loaded', start_mask.shape, '->', file_name308 for col in mask.index:309 if col in start_mask.index:310 mask[col] = start_mask[col]311 print 'start mask applied'312 return mask313 for ix in mask.index:314 if ix in start_mask: mask[ix] = 'F'315 return mask316def select_features(mask_name, features, cls=ln.LinearRegression(normalize=True), allow_merge=False, start_mask=None):317 features = features.copy(deep=True)318 file_name = './dataset/mask.' + mask_name + '.csv'319 changes = 0320 if os.path.isfile(file_name):321 print 'load', file_name, 'data from file'322 mask = pd.Series.from_csv(file_name)323 if 'relevance' not in features.columns:324 mask = mask[mask.index != 'relevance']325 print 'loaded', mask.shape, '->', file_name326 col_to_merge = features.columns[mask == 'M']327 if len(col_to_merge) > 0:328 features['bullet'] += features[col_to_merge].apply(np.sum, axis=1)329 result_features = features[features.columns[mask == 'F']]330 return result_features331 print 'source', features.shape332 mask = features.loc[features.index[0]].apply(lambda x: 'D')333 if 'relevance' in mask.index: mask['relevance'] = 'F'334 if 'id' in mask.index: mask['id'] = 'F'335 mask = fill_start_mask(mask, start_mask)336 y = features['relevance']337 if len(features_to_x(features[features.columns[mask == 'F']]).columns) > 0:338 score = cross_validation.cross_val_score(cls, features_to_x(features[features.columns[mask == 'F']])339 , y, scoring=RMSE_NORMALIZED, cv=5).mean()340 else:341 score = -100000342 print 'start score', score343 for i, feature in enumerate(mask.index[mask == 'D']):344 print 'add', feature,345 mask[feature] = 'F'346 filtered = features[features.columns[mask == 'F']]347 print filtered.shape348 s = cross_validation.cross_val_score(cls, features_to_x(filtered), y, scoring=RMSE_NORMALIZED, cv=5).mean()349 print 'calculated score', s,350 if s > score:351 score = s352 changes += 1353 print 'accept feature', feature354 else:355 mask[feature] = 'D'356 print 'reject feature', feature357 print 'remove features', score358 for feature in mask.index[mask == 'F']:359 if feature in {'relevance', 'id'}: continue360 print 'remove', feature,361 mask[feature] = 'D'362 filtered = features[features.columns[mask == 'F']]363 print filtered.shape364 s = cross_validation.cross_val_score(cls, features_to_x(filtered), y, scoring=RMSE_NORMALIZED, cv=5).mean()365 print 'calculated score', s,366 if s > score:367 score = s368 changes += 1369 print 'reject feature', feature370 else:371 mask[feature] = 'F'372 print 'rollback feature', feature373 if allow_merge:374 print 'merge features', score375 unmerged = {'relevance', 'id', 'bullet', 'search_len', 'synonyms'}376 for i, feature in enumerate(mask.index):377 if feature in unmerged: continue378 print 'merge', feature,379 backup = mask[feature]380 mask[feature] = 'M'381 features['bullet'] += features[feature]382 filtered = features[features.columns[mask == 'F']]383 print filtered.shape384 s = cross_validation.cross_val_score(cls, features_to_x(filtered), y, scoring=RMSE_NORMALIZED, cv=5).mean()385 print 'calculated score', s,386 if s > score:387 score = s388 print 'merge feature', feature389 elif (s == score) and (backup == 'D'):390 print 'merge feature', feature391 else:392 mask[feature] = backup393 features['bullet'] -= features[feature]394 print 'rollback feature', feature395 result_features = features[features.columns[mask == 'F']]396 print 'store', mask.shape, '->', file_name397 mask.to_csv(file_name)398 print 'result score', score, result_features.shape, 'changes', changes399 return result_features400def apply_search_len(s):401 if s['search_len'] == 0: return 0402 return float(s[0]) / s['search_len']403def normalize_search_len(features):404 features = features.copy()405 print 'normalization by search_len', features.shape, '->',406 for i, col in enumerate(features.columns):407 if col in {'id', 'relevance', 'search_len'}: continue408 features[col + '/slennorm/'] = features[[col, 'search_len']].apply(apply_search_len, axis=1)409 print features.shape...
test_logger.py
Source:test_logger.py
...89 self.assertTrue(self.test_logger.is_error_enabled())90 self.assertTrue(self.test_logger.is_warning_enabled())91 self.assertTrue(self.test_logger.is_info_enabled())92 self.assertTrue(self.test_logger.is_debug_enabled())93 self.assertTrue(self.test_logger.is_trace_enabled())9495 self.test_logger.set_level( Logger.TRACE )96 self.assertTrue(self.test_logger.is_fatal_enabled())97 self.assertTrue(self.test_logger.is_error_enabled())98 self.assertTrue(self.test_logger.is_warning_enabled())99 self.assertTrue(self.test_logger.is_info_enabled())100 self.assertTrue(self.test_logger.is_debug_enabled())101 self.assertTrue(self.test_logger.is_trace_enabled())102 103 self.test_logger.set_level( Logger.DEBUG )104 self.assertTrue(self.test_logger.is_fatal_enabled())105 self.assertTrue(self.test_logger.is_error_enabled())106 self.assertTrue(self.test_logger.is_warning_enabled())107 self.assertTrue(self.test_logger.is_info_enabled())108 self.assertTrue(self.test_logger.is_debug_enabled())109 self.assertFalse(self.test_logger.is_trace_enabled())110 111 self.test_logger.set_level( Logger.INFO )112 self.assertTrue(self.test_logger.is_fatal_enabled())113 self.assertTrue(self.test_logger.is_error_enabled())114 self.assertTrue(self.test_logger.is_warning_enabled())115 self.assertTrue(self.test_logger.is_info_enabled())116 self.assertFalse(self.test_logger.is_debug_enabled())117 self.assertFalse(self.test_logger.is_trace_enabled())118 119 self.test_logger.set_level( Logger.WARNING )120 self.assertTrue(self.test_logger.is_fatal_enabled())121 self.assertTrue(self.test_logger.is_error_enabled())122 self.assertTrue(self.test_logger.is_warning_enabled())123 self.assertFalse(self.test_logger.is_info_enabled())124 self.assertFalse(self.test_logger.is_debug_enabled())125 self.assertFalse(self.test_logger.is_trace_enabled())126 127 self.test_logger.set_level( Logger.ERROR )128 self.assertTrue(self.test_logger.is_fatal_enabled())129 self.assertTrue(self.test_logger.is_error_enabled())130 self.assertFalse(self.test_logger.is_warning_enabled())131 self.assertFalse(self.test_logger.is_info_enabled())132 self.assertFalse(self.test_logger.is_debug_enabled())133 self.assertFalse(self.test_logger.is_trace_enabled())134 135 self.test_logger.set_level( Logger.FATAL )136 self.assertTrue(self.test_logger.is_fatal_enabled())137 self.assertFalse(self.test_logger.is_error_enabled())138 self.assertFalse(self.test_logger.is_warning_enabled())139 self.assertFalse(self.test_logger.is_info_enabled())140 self.assertFalse(self.test_logger.is_debug_enabled())141 self.assertFalse(self.test_logger.is_trace_enabled())142 143 self.test_logger.set_level( Logger.OFF )144 self.assertFalse(self.test_logger.is_fatal_enabled())145 self.assertFalse(self.test_logger.is_error_enabled())146 self.assertFalse(self.test_logger.is_warning_enabled())147 self.assertFalse(self.test_logger.is_info_enabled())148 self.assertFalse(self.test_logger.is_debug_enabled())149 self.assertFalse(self.test_logger.is_trace_enabled())150 151 # Now test to see if a change to the logging level in one log affects other logs152 new_logger = ESAPI.logger("test_num2" )153 self.test_logger.set_level( Logger.OFF )154 new_logger.set_level( Logger.INFO )155 self.assertFalse(self.test_logger.is_fatal_enabled())156 self.assertFalse(self.test_logger.is_error_enabled())157 self.assertFalse(self.test_logger.is_warning_enabled())158 self.assertFalse(self.test_logger.is_info_enabled())159 self.assertFalse(self.test_logger.is_debug_enabled())160 self.assertFalse(self.test_logger.is_trace_enabled())161 162 self.assertTrue(new_logger.is_fatal_enabled())163 self.assertTrue(new_logger.is_error_enabled())164 self.assertTrue(new_logger.is_warning_enabled())165 self.assertTrue(new_logger.is_info_enabled())166 self.assertFalse(new_logger.is_debug_enabled())167 self.assertFalse(new_logger.is_trace_enabled())168 169 def test_info(self):170 """171 Test of info method, of class esapi.Logger.172 """173 self.test_logger.info(Logger.SECURITY_SUCCESS, "test message")174 self.test_logger.info(Logger.SECURITY_SUCCESS, "test message", None)175 self.test_logger.info(Logger.SECURITY_SUCCESS, "%3escript%3f test message", None)176 self.test_logger.info(Logger.SECURITY_SUCCESS, "<script> test message", None)177 178 def test_trace(self):179 """180 Test of trace method, of class esapi.Logger.181 """
...
websocket_transport.py
Source:websocket_transport.py
1import json2import logging3from asyncio import ensure_future4from typing import Any, List5from lime_python import SessionCompression, SessionEncryption, Transport6from websockets.client import WebSocketClientProtocol, connect7from websockets.exceptions import ConnectionClosed8class WebSocketTransport(Transport):9 """WebSocket transport implementation."""10 def __init__(self, is_trace_enabled: bool = False) -> None:11 super().__init__(SessionCompression.NONE, SessionEncryption.NONE)12 self.is_trace_enabled = is_trace_enabled13 logger = print14 if logging.root.level <= logging.DEBUG:15 logger = logging.getLogger()16 self.logger = logger17 self.websocket: WebSocketClientProtocol = None18 async def open_async(self, uri: str = None) -> None: # noqa: D10219 if self.websocket and self.websocket.open:20 err = ValueError('Cannot open an already open connection')21 self.on_error(err)22 raise err23 if uri.startswith('wss://'):24 self.encryption = SessionEncryption.TLS25 else:26 self.encryption = SessionEncryption.NONE27 self.compression = SessionCompression.NONE28 self.websocket = await connect(uri, subprotocols=['lime'])29 self.on_open()30 ensure_future(self.__message_handler_async())31 async def close_async(self) -> None: # noqa: D10232 await self.websocket.close()33 self.on_close()34 def send(self, envelope: dict) -> None: # noqa: D10235 self.__ensure_is_open()36 envelope_str = json.dumps(envelope)37 if self.is_trace_enabled:38 self.logger(f'WebSocket SEND: {envelope_str}')39 ensure_future(self.websocket.send(envelope_str))40 def get_supported_compression(self) -> List[str]: # noqa: D10241 return [SessionCompression.NONE]42 def set_compression(self, compression: str) -> None: # noqa: D10243 pass44 def get_supported_encryption(self) -> List[str]: # noqa: D10245 return [SessionEncryption.TLS, SessionEncryption.NONE]46 def set_encryption(self, encryption: str) -> None: # noqa: D10247 pass48 def on_envelope(self, envelope: dict) -> None: # noqa: D10249 pass50 def on_open(self) -> None:51 """Handle on websocket open callback."""52 pass53 def on_close(self) -> None: # noqa: WPS12354 """Handle on websocket close callback."""55 pass56 def on_error(self, err: Any) -> None:57 """Handle on websocket error callback.58 Args:59 err (Any): the exception60 """61 pass62 def __ensure_is_open(self) -> None:63 if not self.websocket or not self.websocket.open:64 err = ValueError('The connection is not open')65 self.on_error(err)66 raise err67 async def __message_handler_async(self) -> None:68 try:69 while True: # noqa: WPS45770 message = await self.websocket.recv()71 self.__on_envelope(message)72 except ConnectionClosed:73 if self.is_trace_enabled:74 self.logger(75 'Stopped receiving messages due to closed connection'76 )77 def __on_envelope(self, envelope: str) -> None:78 if self.is_trace_enabled:79 self.logger(f'WebSocket RECEIVE: {envelope}')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!