Best Python code snippet using playwright-python
parsers.py
Source:parsers.py
1from abc import ABC, abstractmethod2from typing import Dict, List3from urllib.parse import urlparse4from core.parsing import utils5from core.items import CoreTableItem6from core.parsing.exceptions import InvalidTableException7class TableParser(ABC):8 """9 Abstract base class that should be defined for different types of tables that require parsing.10 See get_parser_from_url()11 """12 @classmethod13 @abstractmethod14 def normalize_table(cls, table) -> Dict:15 raise Exception(16 f'{cls.__class__.__name__}.normalize_table is not defined')17 @classmethod18 def get_table_title(cls, table) -> str:19 title = table.css('caption').get() or ""20 return utils.clean_whitespace(title)21 @classmethod22 def get_tables(cls, response):23 tables = response.css('table')24 for t in tables:25 # remove tables inside forms26 if utils.is_in_form(t):27 continue28 # skip over tables with subtables29 if t.css('table table'):30 continue31 yield t32 @classmethod33 @abstractmethod34 def parse_table(cls, table) -> CoreTableItem:35 raise Exception(36 f'{cls.__class__.__name__}.parse_table is not defined')37class WikitableParser(TableParser):38 __TABLE_CLASS = "wikitable"39 @classmethod40 def normalize_table(cls, table):41 all_rows = table.css('tr')42 header_values, header_index = cls.__find_header_row(table)43 rows_after_headers = all_rows[header_index + 1:]44 rows_with_cleaned_cells = [45 list(map(utils.parse_inner_text_from_html, row.css('td').extract()))46 for row in rows_after_headers if row != ''47 ]48 if len(rows_with_cleaned_cells) < 2 or len(header_values) < 2:49 raise InvalidTableException()50 return [header_values] + rows_with_cleaned_cells51 @classmethod52 def __find_header_row(cls, table):53 all_rows = table.css('tr')54 rows = map(lambda x: x.css('th').getall(), all_rows)55 header_values, header_index = None, -156 for index, headers in enumerate(rows):57 if len(headers) != 0:58 header_values = headers59 header_index = index60 if header_values is None:61 raise InvalidTableException('No headers found on wikitable')62 header_values = list(63 map(utils.parse_inner_text_from_html, header_values))64 return header_values, header_index65 @classmethod66 def get_tables(cls, response):67 return response.css(f'table.{WikitableParser.__TABLE_CLASS}')68 @classmethod69 def parse_table(cls, table) -> CoreTableItem:70 header_values, _ = cls.__find_header_row(table)71 relation = cls.normalize_table(table)72 return CoreTableItem(73 table=relation,74 title=cls.get_table_title(table),75 markup=table.get(),76 header_position="FIRST_ROW", # TODO: hardcoded77 headers=header_values,78 table_type="RELATION", # TODO: hardcoded79 orientation="VERTICAL", # TODO: hardcoded80 nb_columns=len(relation[0]),81 nb_rows=len(relation),82 )83class WellFormattedTableParser(TableParser):84 @classmethod85 def normalize_table(cls, table):86 # eliminate tables with "rowspan" or "colspan"87 if table.css('td[colspan]').getall() \88 or table.css('th[colspan]').getall() \89 or table.css('td[rowspan]').getall() \90 or table.css('td[rowspan]').getall():91 raise InvalidTableException(92 'Tables with colspan / rowspan not supported')93 # extract header94 header_values = cls.__get_headers_values(table)95 if not header_values:96 # tables without headers are almost useless for automated processing,97 # just drop them here98 raise InvalidTableException('No headers found in table')99 def extract_rows_from_body(table):100 for row in table.css('tr'):101 cleaned_cells = [utils.parse_inner_text_from_html(x)102 for x in row.css('td').getall()]103 # only return non-empty rows104 if cleaned_cells:105 yield cleaned_cells106 # extract body107 body_rows = list(extract_rows_from_body(table))108 # combine header and body into one relation109 relation = [header_values] + body_rows110 # basic sanity check for body, will raise on error111 nb_rows, nb_columns = utils.validate_body_cell_layout(relation)112 return relation, nb_rows, nb_columns113 @classmethod114 def __get_headers_values(cls, table) -> List[str]:115 header_columns = table.css('tr th')116 header_values = list(117 map(utils.parse_inner_text_from_html, header_columns.getall()))118 return header_values119 @classmethod120 def parse_table(cls, table) -> CoreTableItem:121 relation, nb_rows, nb_columns = cls.normalize_table(table)122 return CoreTableItem(123 table=relation,124 title=cls.get_table_title(table),125 markup=table.get(),126 header_position="FIRST_ROW", # TODO: hardcoded127 table_type="RELATION", # TODO: hardcoded128 headers=cls.__get_headers_values(table),129 orientation="VERTICAL", # TODO: hardcoded130 nb_columns=nb_columns,131 nb_rows=nb_rows,132 )133def get_parser_from_url(url: str) -> TableParser:134 o = urlparse(url)135 if "wikipedia.org" in o.netloc:136 return WikitableParser()...
__init__.py
Source:__init__.py
...22 """ combo box selection list """23 return self.get("//span[text()='{item_name}' and "24 "./ancestor::*//div[contains(@class,'v-filterselect-suggestmenu')]]"25 "".format(item_name=item_name))26 def _get_table_root_and_header_values(self, header_value, lower_case_headers=False):27 root = self.get_inner_html_as_xml(table_with_header_value(header_value, as_str=True))28 header_values = [lxml_text_content(e).strip() for e in root.xpath(table_header_tds(as_str=True))]29 if lower_case_headers:30 header_values = [v.lower() for v in header_values]31 return root, header_values32 def get_table_information(self, header_value, tables_of_inputs=False, lower_case_keys=False):33 table_return = []34 def fill_table_return(elements, f):35 _table_return = []36 has_values = False37 for i in range(len(elements) // header_values_len):38 current_index = i * header_values_len39 values = [f(e).strip() for e in elements[current_index:current_index + header_values_len]]40 if any(values):41 has_values = True42 _table_return.append(dict(zip(header_values, values)))43 return _table_return, has_values44 root, header_values = self._get_table_root_and_header_values(header_value, lower_case_keys)45 header_values_len = len(header_values)46 any_value = False47 if not tables_of_inputs:48 table_return, any_value = fill_table_return(root.xpath(table_rows_tds(as_str=True)),49 lambda e: lxml_text_content(e))50 if tables_of_inputs or not any_value:51 table_return, _ = fill_table_return(52 self.get_table_elements(header_value, only_rows=True, table_of_inputs=True),53 lambda e: e.get_attribute('value'))54 return table_return55 def upload_file(self, filepath, upload_button_label):56 self.get(locators.file_input_followed_by_button(upload_button_label, as_str=True)).enter_text(filepath)57 self.move_mouse_to_and_click(self.get(locators.button(upload_button_label, as_str=True)))58 def get_grid_information(self, grid_label_value):59 table = self.get_inner_html_as_xml(grid_locator_containing_label(grid_label_value, as_str=True))60 root = table.getroot()61 label_values = root.xpath("//div[contains(@class, 'v-label')]")62 ret = {}63 for label, value in [(label_values[i], label_values[i + 1]) for i in range(len(label_values) - 1)[::2]]:64 ret[label.text] = value.text65 return ret66 def get_table_elements(self, header_value, only_rows=False, table_of_inputs=False):67 table = self.get(table_with_header_value(header_value, as_str=True))68 elements = table.get_list(table_rows_input(as_str=True) if table_of_inputs else69 table_rows_tds(as_str=True))70 if only_rows:71 return elements72 else:73 headers = table.get_list(table_header_tds(as_str=True))74 return headers, elements75 def get_table_elements_as_dict(self, header_value, lower_case_keys=False):76 _, header_values = self._get_table_root_and_header_values(header_value, lower_case_keys)77 header_values_len = len(header_values)78 rows = self.get_table_elements(header_value, only_rows=True)79 ret = []80 if rows:81 for i in range(len(rows) // len(header_values)):82 current_index = i * header_values_len83 ret.append(84 dict(zip(header_values, [e for e in rows[current_index:current_index + header_values_len]])))85 return ret86 def collapse_open_tree_node(self, node_name):87 self.click_element_at(self.get(tree_menu_node(node_name, as_str=True)), -5, 5)88def tree_menu(node, child=None):89 def menu_deco(f):90 wraps(f)...
Page.py
Source:Page.py
1import json2import lxml.html3import requests4from neolib.http.HTMLForm import HTMLForm5class Page:6 """Represents a HTML web page7 This class is used to encapsulate a HTML web page into a Python object. Not8 all of the attributes a HTML web page can have are present in this class.9 The primary purpose of this class is for creating an encapsulation that10 can be passed around the library for uniformity. The secondary purpose is11 to tie in the requests and lxml libraries into one easy-to-use object.12 Attributes:13 | **url**: The originating URL for this web page14 | **request**: A :class:`Request` object representing the HTTP request15 | **response**: A :class:`Response` object representing the HTTP response16 | **headers**: A dictionary containing the response headers17 | **content**: The HTML content of the web page18 | **json**: If the returned content is JSON this will be loaded19 | **post_data**: Optional dictionary containing any POST data sent20 | **header_values**: Optional dictionary with any header values overriden21 | **usr**: Optional :class:`.User` object used to request this page22 | **document**: An instance of the root element of this HTML page23 | **forms**: A list of :class:`.HTMLForm` objects for forms on this page24 """25 url = ''26 request = None27 response = None28 headers = None29 content = ''30 json = {}31 post_data = {}32 header_values = {}33 usr = None34 document = None35 forms = []36 def __init__(self, url, usr=None, post_data=None,37 header_values=None, proxy=None):38 """Initializes the page with the given request details39 Args:40 | **url**: The URL of the page to request41 | **usr**: Optional :class:`.User` object to make the request with42 | **post_data**: Optional dictionary of data to POST with (name -> value)43 | **header_values**: Optional dictionary of header values to override the44 request headers with.45 | **proxy**: Optional proxy to use with the request46 """47 # Set class attributes48 self.url = url49 self.post_data = post_data50 self.header_values = header_values51 # Determine if this request is using an existing user52 if usr:53 if post_data:54 r = usr.session.post(url, data=post_data,55 headers=header_values, proxies=proxy)56 else:57 r = usr.session.get(url, headers=header_values, proxies=proxy)58 else:59 if post_data:60 r = requests.post(url, data=post_data, headers=header_values,61 proxies=proxy)62 else:63 r = requests.get(url, headers=header_values, proxies=proxy)64 # Setup attributes based on response65 self.request = r.request66 self.response = r67 self.headers = r.headers68 # Check what this is69 if r.headers['Content-Type'] == 'application/ajax':70 self.content = r.text71 self.json = json.loads(r.text)72 elif 'text/html' in r.headers['Content-Type']:73 # Prep the html parser74 self.content = r.text75 self.document = lxml.html.document_fromstring(self.content)76 # Process forms77 self.forms = []78 for form in self.xpath('//form'):79 self.forms.append(HTMLForm(url, form))80 else:81 self.content = r.content82 def form(self, **kwargs):83 """Searches for forms this page holds using the given keyword args84 Args:85 **kwargs: The keyword arguments to search for86 Returns:87 A list of :class:`.HTMLForm` instances that match the original query88 Example:89 `form = pg.form(action='/login.phtml')`90 """91 matches = []92 # Loop through all forms and associated attributes looking for93 # supplied keyword arguments94 for form in self.forms:95 for key in kwargs.keys():96 try:97 value = getattr(form, key)98 if kwargs[key] == value:99 matches.append(form)100 except Exception:101 continue102 return matches103 def xpath(self, query):104 """Queries the current page using the given XPath query105 Args:106 **query**: The XPath query to use107 Returns:108 A list of elements resulting from the XPath query109 """110 return self.document.xpath(query)111 def __repr__(self):...
ping.py
Source:ping.py
1#!/usr/bin/python2# TODO: Get the address info to extract the IP address of the host3import argparse4import atexit5import math6from os import getpid7from socket import AF_INET, IP_MULTICAST_TTL, IPPROTO_ICMP, IPPROTO_IP, IP_TTL, SOCK_RAW8from socket import socket9import struct10import time11ICMP_TYPE = 812ICMP_CODE = 013IPV4_BYTES_LENGTH = 2014CHECK_SUM_UPPER_POS = 215CHECK_SUM_LOWER_POS = 316DEFAULT_TIMEOUT = 4.017SEQ_BYTE_POS = 718parser = argparse.ArgumentParser(description='ping - send ICMP ECHO_REQUEST to network hosts', prog='ping')19parser.add_argument('destination', help='The host address to ping; either a fully qualified domain name or an IP address.')20# TODO: ensure that count is an integer21parser.add_argument('-c', default=None, dest='count', metavar='count', help='')22parser.add_argument('-t', default=56, dest='ttl', metavar='ttl', help='The time-to-live value for outgoing packets.')23args = parser.parse_args()24# Given an number represented by an array of bytes from most significant number to least25# significant, calculate a checksum.26def calc_checksum(bytes):27 total = 028 # Add each 16 bit segment of the ICMP message. To form the 16 bit segment, each29 # even byte is shifted 8 bits to the left. The following byte is added (without30 # any bit shifting).31 for idx, val in enumerate(bytes):32 if idx == 0 or idx % 2 == 0:33 total += val << 834 else:35 total += val36 # If the value of the total exceeds the 2 octects, add the overflowing bits to the37 # 16 bit portion of total. Iterate through this process until there are no more38 # overflowing bits left.39 while (total >> 16) > 0: total = (total & 0xFFFF) + (total >> 16)40 # Return the 16 bit one's complement, as an array of bytes41 return bytearray(struct.pack('!H', total ^ 0xFFFF))42# Callback function used with 'atexit'.43def close_socket(socket_connection):44 socket_connection.close()45def main():46 # The process ID of the process is used as the identifier. It's possible that the PID exceeds47 # the maximum possible value of an octet (65535), in which case the value is the bitwise 'and'48 # of the lower 4 bytes.49 pid_bytes = bytearray(struct.pack('!H', getpid() & 0xFFFF))50 # This list represents the initial values of the ICMP header. Each value in the list represents51 # a byte. Some values like the checksum, the identifier, and the sequence is represented by an52 # octet.53 header_values = [ICMP_TYPE, ICMP_CODE, 0, 0, pid_bytes[0], pid_bytes[1], 0, 0]54 s = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)55 # Set the time-to-live, specified either by the user or using the default value of 56.56 s.setsockopt(IPPROTO_IP, IP_TTL, args.ttl)57 s.settimeout(DEFAULT_TIMEOUT)58 s.connect((args.destination, 0))59 atexit.register(close_socket, socket_connection=s)60 while True:61 if args.count == 0:62 break63 # Reset the checksum so that it can be recalculated on the next interation64 header_values[CHECK_SUM_UPPER_POS], header_values[CHECK_SUM_LOWER_POS] = 0, 065 # Get the current time represented as a float66 timestamp_before = time.time()67 # The timestamp in the ICMP message is represented by the significant followed68 # by the69 timestamp_bytes = bytearray(struct.pack(70 '!II',71 int(timestamp_before),72 int((timestamp_before - int(timestamp_before)) * 10 ** 6)73 ))74 # Set the payload of the packet to be an incremental range of bytes starting from75 if header_values[SEQ_BYTE_POS] == 0:76 payload_bytes = bytearray(range(len(timestamp_bytes), 56))77 print('PING (%s): %d data bytes' % (args.destination, len(timestamp_bytes) + len(payload_bytes)))78 header_values[CHECK_SUM_UPPER_POS], header_values[CHECK_SUM_LOWER_POS] = calc_checksum(79 header_values +80 list(timestamp_bytes) +81 list(payload_bytes)82 )83 header_bytes = bytearray(struct.pack(*(['!' + 'B' * len(header_values)] + header_values)))84 try:85 s.send(header_bytes + timestamp_bytes + payload_bytes)86 data = s.recv(256)87 except socket.timestamp as err:88 print(err)89 timestamp_after = time.time()90 timestamp_delta = round((timestamp_after - timestamp_before) * 1000, 3)91 print('%d bytes from %s: icmp_seq=%d ttl=%d time=%.3f ms' %92 (len(data) - IPV4_BYTES_LENGTH, args.destination, header_values[SEQ_BYTE_POS], args.ttl, timestamp_delta))93 header_values[SEQ_BYTE_POS] += 194 time.sleep(1)95 if args.count is not None:96 args.count -= 197if __name__ == "__main__":...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!