Best Python code snippet using playwright-python
mb_api.py
Source:mb_api.py
1import os,sys2import getopt3import rpyc4import time5from collections import defaultdict6from shim_table import ShimTable7parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))8elements_path = parentdir +"/pox/ext/slick/elements" 9sys.path.insert(0, elements_path)10from Logger.Logger import Logger11from Logger1.Logger1 import Logger112from TriggerAll.TriggerAll import TriggerAll13from DnsDpi.DnsDpi import DnsDpi14from P0f.P0f import P0f15from Drop.Drop import Drop16from Noop.Noop import Noop17from BloomFilter.BloomFilter import BloomFilter18from DelayBuffer.DelayBuffer import DelayBuffer19from Constant.Constant import Constant20from Compress.Compress import Compress21# Dummy22from Encrypt.Encrypt import Encrypt23from StatefulFirewall.StatefulFirewall import StatefulFirewall24from IDS.IDS import IDS25from LoadBalancer.LoadBalancer import LoadBalancer26from Less.Less import Less27from More.More import More28from Equal.Equal import Equal29"""30 These are the functions supported by the Shim to the controller.31"""32class ClientService(rpyc.Service):33 def __init__(self, shim):34 self.fd_to_object_map = {}35 self.shim_table = ShimTable()36 # Need this handle for application to call trigger.37 self.shim = shim38 def on_connect(self):39 """code that runs when a connection is created40 (to init the serivce, if needed)41 """42 print "Connection from controller..."43 def on_disconnect(self):44 """Code that runs when the connection has already closed45 (to finalize the service, if needed)46 """47 print "Disconnection from controller..."48 def exposed_install_function(self, msg):49 """Function that calls the elements' init functions.50 Args:51 msg: Key:Value pairs from the Slick Controller.52 Returns:53 True/False54 """55 fds = msg["fd"]56 flow = msg["flow"]57 function_names = msg["function_name"]58 params_dicts = msg["params"]59 for i in range(0, len(fds)):60 fd = fds[i]61 function_name = function_names[i]62 params_dict = params_dicts[i]63 function_handle = None64 # from Logger.Logger import Logger65 package_name = function_name +'.'+function_name66 class_name = function_name67 elem_class = sys.modules[package_name].__dict__[class_name]68 elem_instance = elem_class( self.shim, fd )69 elem_instance.init(params_dict)70 try:71 if(isinstance(flow['nw_src'], unicode)): #BAD HACK72 flow['nw_src'] = flow['nw_src'].encode('ascii', 'replace')73 self.shim_table.add_flow(0, flow, fd) #Update flow to fd mapping.74 # This is just for reference.75 self.fd_to_object_map[fd] = elem_instance76 reverse_flow = self.shim.get_reverse_flow(flow)77 self.shim_table.add_flow(0, reverse_flow, fd)78 except Exception:79 print "WARNING: Unable to create handle for the function", function_name ,"with function descriptor", fd80 self._cleanup_failed_install(fds, flow)81 return False82 return True83 def _cleanup_failed_install(self, eds, flow):84 for ed in eds:85 del self.fd_to_object_map[ed]86 del self.flow_to_fd_map[flow]87 def exposed_configure_function(self, msg):88 """Calls element's configure function based on element descriptor.89 Args:90 msg: Key:Value pairs from the Slick Controller.91 Returns:92 True/False93 """94 fd = int(msg["fd"])95 params = msg["params"]96 # Call the respective configure function.97 if(self.fd_to_object_map.has_key(fd)):98 self.fd_to_object_map[fd].configure(params)99 return True100 else:101 return False102 def exposed_stop_function(self, msg):103 """Stop the pid. Kill the process. Remove executables. Remove Logic104 """105 fd = int(msg["fd"])106 if(self.fd_to_object_map[fd].shutdown()):107 # We need process killing wrappers or garbage collectors.108 # A process sets up particular resources.109 del self.fd_to_object_map[fd]110 return True111 def raise_trigger(self, trigger_message):112 trigger_message["type"]= "trigger" #Adding flag for the controller to identify the trigger message113 self.shim.client.send_data_basic(trigger_message)114 def fwd_pkt(self, packets):115 """An array of bufs"""116 for packet in packets:117 self.shim.forward_data_sock.send(packet)118 def fwd_pkt_old(self, packet):119 self.shim.forward_data_sock.send(packet)120 def get_function_handles_from_flow(self, flow):121 """Return the objects for the flow.122 """123 element_handles = [ ]124 fds = self.shim_table.lookup_flow(flow) #Update flow to fd mapping.125 if fds: # A flow is not installed yet126 for fd in fds:127 if(fd != None):128 if not (self.fd_to_object_map.has_key(fd)):129 # If not a single handle is found for an element descriptor130 # invalidate the service chain.131 return None132 else:133 element_handles.append(self.fd_to_object_map[fd])134 return element_handles135class ShimResources(object):136 """Place it as an element in front of the chain."""137 def __init__(self, shim):138 self.shim = shim139 self.cpu_percentage = 0140 self.mem_percentage = 0141 self.trigger_time = 0142 self.max_flows = 1143 self.trigger_interval = 500 # seconds144 def calc_triggers(self, flow):145 active_flows = self.shim.get_active_flows()146 trigger_msg = { }147 if active_flows >= self.max_flows:148 cur_time = time.time()149 if (cur_time - self.trigger_time) > self.trigger_interval:150 trigger_msg = {"ed" : 0, "mac" : self.shim.mac, "max_flows" : True}151 self.shim.client_service.raise_trigger(trigger_msg)152 self.trigger_time = time.time()153def usage():154 pass155def start_rpyc(port):156 from rpyc.utils.server import ThreadedServer157 t = ThreadedServer(ClientService, port = 18861)158 t.start()159def main(argv):160 DEFAULT_PORT = 18861161 #Parse options and arguments162 port = None163 mode =None164 try:165 opts, args = getopt.getopt(sys.argv[1:], "hsp:m:", ["help", "start", "port", "mode="])166 except getopt.GetoptError, err:167 print "Option error!"168 print str(err)169 sys.exit(2)170 for opt, arg in opts:171 if opt in ("-h", "--help"):172 usage()173 sys.exit()174 elif opt in ("-s", "--start"):175 start_rpyc(port)176 elif opt in ("-p", "--port"):177 port = arg178 print "Connecting on the port:", port179 elif opt in ("-m", "--mode"):180 mode = str(arg)181 print "starting shim in the mode: ", mode182 if(not port):183 print "Setting default values."184 port = DEFAULT_PORT 185 start_rpyc(port)186if __name__ == "__main__":...
plot.py
Source:plot.py
1import numpy as np2import matplotlib.pyplot as plt3import matplotlib4from mpl_toolkits.mplot3d import Axes3D5import copy as cp6def equal_3d(ax=plt.gca()):7 x_lims = np.array(ax.get_xlim())8 y_lims = np.array(ax.get_ylim())9 z_lims = np.array(ax.get_zlim())10 x_range = np.diff(x_lims)11 y_range = np.diff(y_lims)12 z_range = np.diff(z_lims)13 max_range = np.max([x_range,y_range,z_range])/214 ax.set_xlim(np.mean(x_lims) - max_range, np.mean(x_lims) + max_range)15 ax.set_ylim(np.mean(y_lims) - max_range, np.mean(y_lims) + max_range)16 ax.set_zlim(np.mean(z_lims) - max_range, np.mean(z_lims) + max_range)17 # ax.set_aspect(1)18 19 return ax20def plot_transformation_mats(x, y, z, T, figno=None, ax=None, scaling='auto', show_legend=False):21 if ax==None:22 fig = plt.figure(figno)23 ax = fig.add_subplot(111, projection='3d')24 # ax.scatter(x,y,z,'.k')25 if scaling=='auto':26 xr = max(x)-min(x)27 yr = max(y)-min(y)28 zr = max(z)-min(z)29 r = np.sqrt(xr**2+yr**2+zr**2)30 scaling = 0.005*r31 compcolors = ['tab:red', 'tab:blue', 'tab:green']32 h = [None]*333 for ix, Ti in enumerate(T):34 xi = x[ix]35 yi = y[ix]36 zi = z[ix]37 38 for comp in range(0,3):39 xunit = [xi, xi+Ti[comp,0]*scaling]40 yunit = [yi, yi+Ti[comp,1]*scaling]41 zunit = [zi, zi+Ti[comp,2]*scaling]42 h[comp] = plt.plot(xs=xunit, ys=yunit, zs=zunit, color=compcolors[comp])[0]43 if show_legend:44 plt.legend(h,['x', 'y', 'z'])45 ax.set_xlabel('x')46 ax.set_ylabel('y')47 ax.set_zlabel('z')48 equal_3d(ax)49 return ax,h50def plot_elements_from_matrices(element_matrix, node_matrix, chosen_nodes_ix=[], disp=None, node_labels=False, element_labels=False, nodes=True, elements=True, ax=None, fig=None, element_settings={}, node_settings={}, node_label_settings={}, chosen_node_settings={}, disp_settings={}, element_label_settings={}, three_d=True, transformation_mats=None, tmat_scaling='auto'):51 e_dict = {'color': 'DarkGreen', 'alpha': 1}52 e_dict.update(**element_settings)53 n_dict = {'color':'Black', 'linestyle':'', 'marker':'.', 'markersize':4, 'alpha':0.8}54 n_dict.update(**node_settings)55 56 n_chosen_dict = {'color':'GreenYellow', 'linestyle':'', 'marker':'o', 'markersize':8, 'alpha':1, 'markeredgecolor':'dimgray'}57 n_chosen_dict.update(**chosen_node_settings)58 59 disp_dict = {'color':'IndianRed', 'alpha':1}60 disp_dict.update(**disp_settings)61 l_nodes_dict = {'color':'Black', 'fontsize': 8, 'fontweight':'normal'}62 l_nodes_dict.update(**node_label_settings)63 64 l_elements_dict = {'color':'LimeGreen', 'fontsize': 8, 'fontweight':'bold', 'style':'italic'}65 l_elements_dict.update(**element_label_settings)66 67 if ax is None and fig is None:68 fig = plt.figure()69 70 if ax == None and three_d:71 ax = fig.gca(projection='3d')72 elif ax == None:73 ax = fig.gca()74 elif three_d:75 176 # ax.set(projection='3d') #mangler funksjonalitet her...77 78 element_handles = [None]*len(element_matrix[:,0])79 if elements:80 if transformation_mats is not None:81 xm = np.zeros([len(element_matrix[:,0]), 3])82 83 for element_ix, __ in enumerate(element_matrix[:,0]):84 node1 = element_matrix[element_ix, 1]85 node2 = element_matrix[element_ix, 2]86 nodeix1 = np.where(node_matrix[:,0]==node1)[0]87 nodeix2 = np.where(node_matrix[:,0]==node2)[0]88 x01 = node_matrix[nodeix1,1:4]89 x02 = node_matrix[nodeix2,1:4]90 x0 = np.vstack([x01,x02])91 if transformation_mats is not None:92 xm[element_ix, :] = np.mean(x0, axis=0)93 if three_d:94 element_handles[element_ix] = ax.plot(xs=x0[:,0], ys=x0[:,1], zs=x0[:,2], **e_dict)95 else:96 element_handles[element_ix] = ax.plot(x0[:,0], x0[:,1], **e_dict)97 98 if element_labels:99 xmean = np.mean(x0, axis=0)100 if three_d:101 ax.text(xmean[0],xmean[1],xmean[2],'%i' % element_matrix[element_ix,0], **l_elements_dict)102 else:103 ax.text(xmean[0],xmean[1],s='%i' % element_matrix[element_ix,0], **l_elements_dict)104 105 if disp is not None:106 disp_node1 = disp[nodeix1[0]*6:(nodeix1[0]*6+6)]107 disp_node2 = disp[nodeix2[0]*6:(nodeix2[0]*6+6)]108 x1 = x01+disp_node1[0:3]109 x2 = x02+disp_node2[0:3]110 x = np.vstack([x1,x2])111 112 if three_d:113 ax.plot(xs=x[:,0], ys=x[:,1], zs=x[:,2], **disp_dict)114 else:115 ax.plot(x[:,0], x[:,1], **disp_dict)116 if transformation_mats is not None:117 plot_transformation_mats(xm[:, 0], xm[:, 1], xm[:, 2], transformation_mats, ax=ax, scaling=tmat_scaling, show_legend=True)118 if nodes:119 if three_d:120 ax.plot(xs=node_matrix[:, 1], ys=node_matrix[:, 2], zs=node_matrix[:, 3], **n_dict)121 else:122 ax.plot(node_matrix[:, 1], node_matrix[:, 2], **n_dict)123 124 if chosen_nodes_ix != []:125 if three_d:126 ax.plot(xs=node_matrix[chosen_nodes_ix, 1], ys=node_matrix[chosen_nodes_ix, 2], zs=node_matrix[chosen_nodes_ix, 3], **n_chosen_dict)127 else:128 ax.plot(node_matrix[chosen_nodes_ix, 1], node_matrix[chosen_nodes_ix, 2], **n_chosen_dict)129 130 if node_labels:131 if three_d:132 for node_ix in range(0, np.shape(node_matrix)[0]):133 ax.text(node_matrix[node_ix, 1], node_matrix[node_ix, 2], node_matrix[node_ix, 3], '%i' % node_matrix[node_ix, 0], **l_nodes_dict)134 else:135 for node_ix in range(0, np.shape(node_matrix)[0]):136 ax.text(node_matrix[node_ix, 1], node_matrix[node_ix, 2], '%i' % node_matrix[node_ix, 0], **l_nodes_dict)137 138 if three_d:139 equal_3d(ax)140 else:141 ax.set_aspect('equal', adjustable='box')142 143 ax.grid('off')144 return ax, element_handles145def figsave(name, w=16, h=10, fig=None, maketight=True, fileformat='png'):146 if fig is None:147 fig = plt.gcf()148 fig.set_figwidth(w/2.54), fig.set_figheight(h/2.54)149 if maketight:150 fig.tight_layout()...
main.py
Source:main.py
1import os2import re3import sys4import time5import yaml6import asyncio7import smtplib8import requests9import schedule10from pathlib import Path11from datetime import datetime12from bs4 import BeautifulSoup13from email.message import EmailMessage14from pyppeteer import launch15from LinkObject import LinkObject16def read_config():17 if not os.path.exists(os.path.join('config', 'config.yaml')):18 print('config file not found! please provide config.yaml')19 sys.exit(1)20 with open(os.path.join('config', 'config.yaml')) as file:21 config = yaml.safe_load(file.read())22 return config23def send_html_mail(msg, username, password, to_username):24 current_date = datetime.now().date()25 port = 58726 account = username27 password = password28 email = EmailMessage()29 email['Subject'] = f'{current_date} houses for rental'30 email['From'] = account31 email['To'] = to_username32 email.set_content(msg, subtype='html')33 with smtplib.SMTP('smtp.office365.com', port) as server:34 server.starttls()35 server.ehlo()36 server.login(account, password)37 server.send_message(email)38async def scroll_to_bottom(page):39 await page.evaluate('''40 var intervalID = setInterval(function() {41 var scrollingElement = (document.scrollingElement || document.body);42 scrollingElement.scrollTop = scrollingElement.scrollHeight;43 }, 200);44 ''')45 prev_height = None46 while True:47 curr_height = await page.evaluate('(window.innerHeight + window.scrollY)')48 if not prev_height:49 prev_height = curr_height50 time.sleep(1)51 elif prev_height == curr_height:52 await page.evaluate('clearInterval(intervalID)')53 break54 else:55 prev_height = curr_height56 time.sleep(1)57async def fetch_contents(page):58 page.waitForSelector('#rent-list-app > div > div.list-container-content > div > section.vue-list-rent-content')59 element_handles = await page.JJ('#rent-list-app > div > div.list-container-content > div > section.vue-list-rent-content')60 result = []61 for handle in element_handles:62 inner_html = await (await handle.getProperty('innerHTML')).jsonValue()63 soup = BeautifulSoup(inner_html, 'html.parser')64 individual_items = soup.find_all('section', 'vue-list-rent-item')65 for item in individual_items:66 link_obj = LinkObject()67 a_link = item.find('a')68 link_obj.link = a_link['href']69 title_tag = item.find('div', 'item-title')70 link_obj.title = title_tag.text71 price_tag = item.find('div', 'item-price-text')72 link_obj.price = price_tag.text73 price_tag_int = int(re.sub(r'\W', '', price_tag.text)[:-2])74 link_obj.price_int = price_tag_int75 photos = item.find('ul', 'carousel-list')76 link_obj.photos = []77 imgs = photos.find_all(lambda tag: tag.has_attr('data-original'))78 for img in imgs:79 link_obj.photos.append(img['data-original'])80 result.append(link_obj)81 return result82async def scrap_591_and_send_html_mail():83 base_url = 'https://rent.591.com.tw'84 config = read_config()85 86 browser = await launch(headless=True, handleSIGINT=False, handleSIGTERM=False, handleSIGHUP=False)87 page = await browser.newPage()88 await page.goto(base_url)89 # rent range start90 page.waitForSelector('#rent-list-app > div > div.vue-filter-container > section:nth-child(3) > ul > li.filter-item-input > div > input:nth-child(1)')91 await page.type('#rent-list-app > div > div.vue-filter-container > section:nth-child(3) > ul > li.filter-item-input > div > input:nth-child(1)', str(config['range-start']))92 page.waitForSelector('#rent-list-app > div > div.vue-filter-container > section:nth-child(3) > ul > li.filter-item-input > div > input:nth-child(3)')93 await page.type('#rent-list-app > div > div.vue-filter-container > section:nth-child(3) > ul > li.filter-item-input > div > input:nth-child(3)', str(config['range-end']))94 page.waitForSelector('#rent-list-app > div > div.vue-filter-container > section:nth-child(3) > ul > li.filter-item-input > div > button')95 await page.click('#rent-list-app > div > div.vue-filter-container > section:nth-child(3) > ul > li.filter-item-input > div > button')96 # scroll to bottom to trigger javascript load97 # fetch first 3 pages98 total_results = []99 element_count = config['element-count']100 while len(total_results) < element_count:101 page.waitFor(3000)102 await scroll_to_bottom(page)103 104 # content list105 page_contents = await fetch_contents(page)106 page_contents = list(filter(lambda x: x.price_int <= config['range-end'], page_contents))107 total_results.extend(page_contents)108 # next page109 page.waitForSelector('#rent-list-app > div > div.list-container-content > div > section.vue-public-list-page > div > a.pageNext')110 await page.click('#rent-list-app > div > div.list-container-content > div > section.vue-public-list-page > div > a.pageNext')111 112 total_results = list(map(lambda x: x.to_html(), total_results))113 send_html_mail('\n'.join(total_results), config['email']['from']['username'], config['email']['from']['password'], ', '.join(config['email']['to']['username']))114def wrapper_func():115 print('running function...')116 print(f'{datetime.now()}')117 # asyncio.set_event_loop(asyncio.SelectorEventLoop())118 asyncio.get_event_loop().run_until_complete(scrap_591_and_send_html_mail())119 print(f'function complete: {datetime.now()}')120def main():121 print('start 591 scrap bot...')122 schedule.every().day.at('05:00').do(wrapper_func)123 while True:124 schedule.run_pending()125 time.sleep(1)126 # asyncio.get_event_loop().run_until_complete(scrap_591_and_send_html_mail())127 128if __name__ == '__main__':...
folder.py
Source:folder.py
...47 for folder in self.__subfolders:48 folder.parent = self49 def __get_files(self):50 els = self.__page.locator(51 '.ef-directory .ef-item-row').element_handles()52 for el in els:53 size = el.query_selector('.ef-size-col').inner_text()54 if size != '--':55 # is file56 self.__files.append(File(self.__page, self, el))57 def __goto(self):58 with self.__page.expect_navigation():59 self.__page.locator(f'[data-id="{self.__id}"] > a').click()60 def walk(self):61 with self.__page.expect_navigation(wait_until='networkidle'):62 self.__goto()63 time.sleep(0.5)64 self.__get_files()65 for file in self.__files:...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!