Best Python code snippet using playwright-python
server.py
Source:server.py
...48 self.content = self.rfile.read(length) if length > 0 else ''49 return self.content.decode('utf-8') if decode else self.content50 def is_valid_content_type(self):51 """Checks if the set content type is valid"""52 return self.is_json_content_type() or self.is_form_urlencoded_data_content_type() or \53 self.is_multipart_form_data_content_type()54 def is_json_content_type(self):55 """Checks if the set content type is application/json"""56 return 'application/json' in self.headers.get('content-type', 'text/plain').lower()57 def is_form_urlencoded_data_content_type(self):58 """Checks if the set content type is form url encoded"""59 return 'application/x-www-form-urlencoded' in self.headers.get('content-type', 'text/plain').lower()60 def is_multipart_form_data_content_type(self):61 """Checks if the set content type is multipart/form-data"""62 return 'multipart/form-data' in self.headers.get('content-type', 'text/plain').lower()63 def is_valid_json(self):64 """Checks if the body content is a valid JSON"""65 data = self.get_data()66 return data is not None67 def get_multipart_boundary(self):68 """Returns the multipart boundary"""69 parts = self.headers.get('content-type', '').split('boundary=')70 return parts[1] if len(parts) > 1 else ''71 def get_data(self):72 """73 Returns the JSON data converted to a dict depending of the content-type sent. Only if data format is correct,74 returns the dict, otherwise, returns None75 """76 if self.data is None:77 if self.is_json_content_type():78 try:79 self.data = json.loads(self.get_content())80 except ValueError:81 self.data = None82 elif self.is_form_urlencoded_data_content_type():83 parsed_data = parse_qs(self.get_content(), keep_blank_values=True)84 self.data = dict(map(85 lambda t: (t[0], t[1][0] if type(t[1]) == list and len(t[1]) == 1 else t[1]), parsed_data.items()86 ))87 elif self.is_multipart_form_data_content_type():88 ctype, pdict = cgi.parse_header(self.headers.get('content-type'))89 if 'boundary' in pdict:90 pdict['boundary'] = pdict['boundary'].encode()91 content = self.get_content(decode=False)92 filenames = re.findall(r'filename="(.*?)"', str(content), re.IGNORECASE | re.DOTALL | re.MULTILINE)93 parsed_data = cgi.parse_multipart(BytesIO(content), pdict)94 self.data = {}95 for key in parsed_data:96 parsed_item = parsed_data[key]97 if type(parsed_item) == list:98 for content in parsed_item:99 try:100 self.data[key] = parsed_item[0].decode('utf-8')101 except UnicodeDecodeError as e:102 # we assume that are files103 try:104 filename = filenames.pop(0)105 except IndexError as e:106 filename = None107 self.data[key] = self.save_as_file(content, filename)108 else:109 self.data[key] = parsed_item110 return self.data111 def get_resource_parts(self):112 """113 Returns a list of resource parts: if URL is 'API_PATH/foo/bar' it returns ['foo', 'bar']114 If not is a valid API_REQUEST, returns an empty list115 """116 if not self.is_api_request():117 return []118 parts_list = list(filter(lambda x: x.replace(' ', '') != '', self.path.split(API_PATH)))119 if len(parts_list) <= 0:120 return []121 return list(filter(lambda x: x.replace(' ', '') != '' and x[0] != '?', parts_list[0].split('/')))122 def write_response(self, data, code=200):123 """124 Writes the response to the request125 :param data: dict with data which will be converted to JSON126 :param code: optional integer with an HTTP response code127 """128 self.send_response(code)129 self.send_header("Content-Type", 'application/json')130 self.end_headers()131 self.wfile.write(json.dumps(data).encode())132 def process_non_api_request(self):133 """Process a non api request serving the static file requested"""134 f = self.send_head()135 if f:136 try:137 self.copyfile(f, self.wfile)138 finally:139 f.close()140 def process_get_list_resource_request(self, resource):141 """Look for the resource and returns its content or a 404 Not found HTTP response if it doesn't exist"""142 resource_path = os.path.join(API_DATA_PATH, resource)143 if not os.path.exists(resource_path):144 self.write_not_found_response(resource)145 else:146 response_items = []147 files = os.listdir(resource_path)148 for file_name in files:149 if not is_int(file_name):150 continue151 file_path = os.path.join(resource_path, file_name)152 try:153 fp = open(file_path, 'r')154 item = json.load(fp)155 fp.close()156 response_items.append(item)157 except IOError as e:158 if e.errno == errno.EACCES:159 self.write_no_access_permission_to_file_response(file_path)160 else: # Not a permission error.161 raise162 query_parts = self.path.split('?')163 if len(query_parts) > 1:164 query = parse_qs(query_parts[1], keep_blank_values=True)165 # filtering results166 for key in query:167 if key in (FIELDS_TO_RECOVER, ORDERING_FIELDS):168 continue169 value = query[key]170 if type(value) == list and len(value) == 1:171 filter_value = value[0]172 response_items = list(filter(lambda x: x.get(key, None) == filter_value, response_items))173 # fields to recover174 fields_to_recover = list(filter(lambda x: x.strip() != '', query.get(FIELDS_TO_RECOVER, '').split(',')))175 if len(fields_to_recover) > 0:176 response_items = map(lambda x: {field: x[field] for field in fields_to_recover}, response_items)177 # order178 possible_fields = query.get(ORDERING_FIELDS, list())179 if len(possible_fields) > 0:180 ordering_fields = list(filter(lambda x: x.strip() != '', possible_fields[0].split(',')))181 for field in ordering_fields:182 if field[0] == "-":183 reverse = True184 field = field[1:]185 else:186 reverse = False187 response_items = sorted(response_items, key=lambda x: x.get(field, ''), reverse=reverse)188 self.write_response(response_items, 200)189 def process_get_detail_resource_request(self, resource, resource_id):190 """Look for the resource and returns its content or a 404 Not found HTTP response if it doesn't exist"""191 resource_path = os.path.join(API_DATA_PATH, resource, resource_id)192 if not os.path.exists(resource_path):193 self.write_not_found_response(resource, resource_id)194 else:195 try:196 fp = open(resource_path, 'r')197 item = json.load(fp)198 fp.close()199 self.write_response(item, 200)200 except IOError as e:201 if e.errno == errno.EACCES:202 self.write_no_access_permission_to_file_response(resource_path)203 else: # Not a permission error.204 raise205 def write_invalid_api_uri_format_response(self):206 """Writes the HTTP 406 Invalid API URI format error response"""207 self.write_response({208 406: 'Invalid API URI format. Expected format: {0}<resource> or {0}<resource>/'.format(API_PATH)209 }, 406)210 def write_method_not_allowed_response(self):211 """Writes the HTTP 405 Method not allowed error response"""212 self.write_response({405: 'Method not allowed'}, 405)213 def write_invalid_content_type_response(self):214 """Writes the HTTP 400 Bad request error response to set the content-type header to a valid content type"""215 self.write_response({400: 'Review your Content-Type header. I only speak: application/json, '216 'application/x-www-form-urlencoded or multipart/form-data bro.'}, 400)217 def write_no_access_permission_to_file_response(self, file_path):218 """Writes the HTTP 403 Forbidden error response when trying to accessing a file without permission"""219 self.write_response({220 403: 'Server process owner has no access to {0}'.format(file_path)221 }, 403)222 def write_not_found_response(self, resource=None, resource_id=None):223 """Writes the HTTP 404 Not found"""224 if resource is not None and resource_id is not None:225 self.write_response({404: 'Resource {0}/{1} not found'.format(resource, resource_id)}, 404)226 elif resource is not None:227 self.write_response({404: 'Resource {0} not found'.format(resource)}, 404)228 else:229 self.write_response({404: 'Resource not found'}, 404)230 def save_as_file(self, content, filename=None):231 """Saves the content as a file in the uploads folder and returns the whole URL"""232 try:233 extension = u"." + filename.split('.')[-1] if filename and len(filename) > 0 else ''234 filename = str(uuid1()) + extension235 file_path = os.path.join(UPLOADS_PATH, filename)236 if not os.path.exists(UPLOADS_PATH):237 try:238 os.makedirs(UPLOADS_PATH)239 except IOError as e:240 if e.errno == errno.EACCES:241 self.write_no_access_permission_to_file_response(file_path)242 else: # Not a permission error.243 raise244 fp = open(file_path, 'wb')245 fp.write(content)246 fp.close()247 return "/{0}/{1}".format(UPLOADS_PATH, filename)248 except IOError as e:249 if e.errno == errno.EACCES:250 self.write_no_access_permission_to_file_response(file_path)251 else: # Not a permission error.252 raise253 def do_GET(self):254 """Process a GET request"""255 if self.is_api_request():256 resource_parts = self.get_resource_parts()257 if resource_parts is None:258 self.write_method_not_allowed_response()259 elif len(resource_parts) == 1:260 self.process_get_list_resource_request(resource_parts[0])261 elif len(resource_parts) == 2:262 self.process_get_detail_resource_request(resource_parts[0], resource_parts[1])263 else:264 self.write_invalid_api_uri_format_response()265 else:266 self.process_non_api_request()267 def do_POST(self):268 """Process a POST request"""269 resource_parts = self.get_resource_parts()270 if resource_parts is None:271 self.write_method_not_allowed_response()272 elif not self.is_valid_content_type():273 self.write_invalid_content_type_response()274 elif self.is_json_content_type() and not self.is_valid_json():275 self.write_invalid_content_type_response()276 elif len(resource_parts) != 1:277 self.write_invalid_api_uri_format_response()278 else:279 resource_path = os.path.join(API_DATA_PATH, resource_parts[0])280 if not os.path.exists(resource_path):281 try:282 os.makedirs(resource_path)283 except IOError as e:284 if e.errno == errno.EACCES:285 self.write_no_access_permission_to_file_response(resource_path)286 else: # Not a permission error.287 raise288 files = os.listdir(resource_path)289 max_id = 0290 for file_name in files:291 try:292 int_value = int(file_name)293 max_id = int_value if int_value > max_id else max_id294 except ValueError:295 pass296 data = self.get_data()297 try:298 if type(data) == dict:299 data = [data]300 for item in data:301 max_id += 1302 item[API_ID_FIELD] = max_id303 new_file_name = os.path.join(resource_path, str(max_id))304 fp = open(new_file_name, 'w')305 json.dump(item, fp)306 fp.close()307 self.write_response(data[0] if len(data) == 1 else data, 201)308 except IOError as e:309 if e.errno == errno.EACCES:310 self.write_no_access_permission_to_file_response(resource_path)311 else: # Not a permission error.312 raise313 def do_PUT(self):314 """315 Process a PUT request316 """317 resource_parts = self.get_resource_parts()318 if len(resource_parts) != 2:319 self.write_invalid_api_uri_format_response()320 elif not self.is_valid_content_type():321 self.write_invalid_content_type_response()322 elif self.is_json_content_type() and not self.is_valid_json():323 self.write_invalid_content_type_response()324 else:325 resource = resource_parts[0]326 resource_id = resource_parts[1]327 resource_path = os.path.join(API_DATA_PATH, resource, resource_id)328 if not os.path.exists(resource_path):329 self.write_not_found_response(resource, resource_id)330 else:331 try:332 data = self.get_data()333 data[API_ID_FIELD] = resource_id334 fp = open(resource_path, 'w')335 json.dump(data, fp)336 fp.close()337 self.write_response(data, 200)338 except IOError as e:339 if e.errno == errno.EACCES:340 self.write_no_access_permission_to_file_response(resource_path)341 else: # Not a permission error.342 raise343 def do_PATCH(self):344 """345 Process a PATCH request346 """347 resource_parts = self.get_resource_parts()348 if len(resource_parts) != 2:349 self.write_invalid_api_uri_format_response()350 elif not self.is_valid_content_type():351 self.write_invalid_content_type_response()352 elif self.is_json_content_type() and not self.is_valid_json():353 self.write_invalid_content_type_response()354 else:355 resource = resource_parts[0]356 resource_id = resource_parts[1]357 resource_path = os.path.join(API_DATA_PATH, resource, resource_id)358 if not os.path.exists(resource_path):359 self.write_not_found_response(resource, resource_id)360 else:361 try:362 data = self.get_data()363 fp = open(resource_path, 'r')364 item = json.load(fp)365 fp.close()366 item.update(data)...
test_utils.py
Source:test_utils.py
...309 def test_parse_suffix(self):310 info = parse_content_type('application/hal+json;charset=utf-8')311 self.assertEqual(('application', 'hal', 'json', 'charset=utf-8'), info)312 def test_json_content_type(self):313 self.assertTrue(is_json_content_type('application/json'))314 self.assertTrue(is_json_content_type('application/hal+json'))315 self.assertTrue(is_json_content_type('application/other+json'))316 self.assertTrue(is_json_content_type(317 'application/schema+json;schema=http://example.com/schema-2'))318 self.assertTrue(is_json_content_type('application/json;charset=utf-8'))319 self.assertFalse(is_json_content_type('text/json'))320 self.assertFalse(is_json_content_type('text/plain'))321 self.assertFalse(is_json_content_type('json'))...
utils.py
Source:utils.py
...169 else:170 return (maintype, subtype, None, parameters)171 else:172 return (types, None, None, parameters)173def is_json_content_type(value: str) -> bool:174 """175 Check if a content type is JSON-parseable176 >>> is_json_content_type('text/plain')177 False178 >>> is_json_content_type('application/schema+json')179 True180 >>> is_json_content_type('application/json')181 True182 """183 type, subtype, suffix, _ = parse_content_type(value.lower())184 return type == 'application' and (subtype == 'json' or suffix == 'json')185def fully_qualified_class_name(obj):186 module = inspect.getmodule(obj)187 if module is not None and module.__name__ != "__main__":188 return module.__name__ + "." + obj.__class__.__name__189 else:190 return obj.__class__.__name__191def package_version(package_name):192 try:193 import pkg_resources194 except ImportError:...
__init__.py
Source:__init__.py
...21 try:22 if (len(self.request.body) > 0):23 headers = self.request.headers24 body = self.request.body.decode('utf-8', 'replace')25 is_json = is_json_content_type(headers.get('Content-Type', ''))26 if is_json and request_tab["method"] == "POST":27 request_tab["POST"] = json.loads(body)28 else:29 request_tab["POST"] = self.request.body_arguments30 except Exception:31 pass32 event.add_tab("request", request_tab)33 if bugsnag.configure().send_environment:34 env = WSGIContainer.environ(self.request)35 event.add_tab("environment", env)36 def _handle_request_exception(self, exc: BaseException):37 options = {38 "user": {"id": self.request.remote_ip},39 "context": self._get_context(),...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!