Best Python code snippet using autotest_python
message.py
Source:message.py
1import re2from time import sleep3from selenium import webdriver4from selenium.common.exceptions import NoSuchElementException, TimeoutException5from selenium.webdriver.common.by import By6from selenium.webdriver.support import expected_conditions as EC7from selenium.webdriver.support.wait import WebDriverWait8from webdriver_manager.chrome import ChromeDriverManager9# Correctly fill in the variables below, and then run this script with python3 message.py10username = 'your_username'11password = 'your_password'12# The URL of the group you want to post to. You MUST be a member of this group. Make sure there's no slash at the end.13group_link = 'https://www.linkedin.com/groups/10356456'14# The regex you want to look for in the person's title.15# For instance, if I want to look for CEOs, I would add '.*ceo.*'. Case insensitive.16member_title_regex_list = ['.*founder .*', '.*partner .*']17# A list of locations you want to contact people in.18# If I wanted to contact people in India and Canada, I would set this to ['India', 'Canada']19locations = ['India']20# The types of organisations the people you want to contact work at. There is a fixed list of types set by LinkedIn.21org_types = ['Legal Services', 'Law Practice']22# Set the following to True if you want to message people whose org types are unclear. False otherwise.23message_if_org_type_unclear = True24# The message you want to send people matching these descriptions.25# Potential inputs are {member_full_name}, {member_first_name}, {member_position}, {current_org}26message = "Hi {member_first_name}, \n\n" \27 "My name is Utkarsh and I'm co-founder of a LegalTech startup called Firmation (www.firmation.in). " \28 "We're both members of the Legal Transformation with Technology group, " \29 "and I saw that you're a decision maker at {current_org}, " \30 "so I wanted to reach out. \n\n" \31 "Many of the lawyers we've spoken to have complained that associates do not log their time regularly, " \32 "waste billable hours doing this at a later date, and sometimes even forget to log time for short " \33 "phone calls and email opinions, which means money lost.\n\n" \34 "To help address this issue, we're currently working on software to help lawyers automatically keep track " \35 "of time they spend working. \n\n" \36 "Does this sound like it could be useful to you? If so, I would love to set up a quick 20-minute chat " \37 "to discuss how our software could help you. \n\n" \38 "Thank you, and looking forward to hearing from you!\n\n" \39 "Utkarsh"40def check_if_member_messaged(linkedin_profile_url):41 # Open the file in read only mode42 with open('messaged_members.txt', 'r') as read_obj:43 # Read all lines in the file one by one44 for line in read_obj:45 # For each line, check if line contains the string46 if linkedin_profile_url in line:47 return True48 return False49def write_member_url_to_file(linkedin_profile_url):50 # Open the file in read only mode51 with open('messaged_members.txt', 'a') as file:52 # Read all lines in the file one by one53 file.write(linkedin_profile_url + '\n')54options = webdriver.ChromeOptions()55options.add_argument('--ignore-certificate-errors')56options.add_argument('--incognito')57# options.add_argument('--headless')58driver = webdriver.Chrome(ChromeDriverManager().install(), chrome_options=options)59driver.get('https://www.linkedin.com/login?fromSignIn=true&trk=guest_homepage-basic_nav-header-signin')60username_field = driver.find_element_by_id('username')61username_field.send_keys(username)62password_field = driver.find_element_by_id('password')63password_field.send_keys(password)64driver.find_element_by_xpath('//button[text()="Sign in"]').click()65driver.get(f'{group_link}/members')66members = driver.find_elements_by_class_name('groups-members-list__typeahead-result')67regex_list_str = '(?:% s)' % '|'.join(map(str.lower, member_title_regex_list))68ctr = 069while True:70 members = driver.find_elements_by_class_name('groups-members-list__typeahead-result')71 while ctr < len(members):72 member = members[ctr]73 try:74 member_title = member.find_element_by_class_name('artdeco-entity-lockup__subtitle').text75 if re.match(regex_list_str, member_title.lower()):76 member_full_name = member.find_element_by_class_name('artdeco-entity-lockup__title').text77 member_first_name = member_full_name.split(' ')[0]78 member_position = member_title.split(' at ')[0]79 message_member = False80 href = member.find_element_by_class_name("ui-entity-action-row__link").get_attribute("href")81 # handle current tab82 first_tab = driver.window_handles[0]83 # open new tab with specific url84 driver.execute_script("window.open('" + href + "');")85 # hadle new tab86 second_tab = driver.window_handles[1]87 # switch to second tab88 driver.switch_to.window(second_tab)89 member_url = driver.current_url90 if not check_if_member_messaged(member_url):91 member_location = driver.find_element_by_class_name('pv-top-card--list-bullet').text92 if any(location in member_location for location in locations):93 try:94 current_org_item = driver.find_element_by_class_name('pv-top-card--experience-list-item')95 current_org = current_org_item.text96 current_org_item.click()97 wait = WebDriverWait(driver, 10).until(EC.presence_of_element_located98 ((By.CLASS_NAME, 'pv-entity__summary-info')))99 experience_list = driver.find_element_by_class_name('pv-entity__summary-info').click()100 wait = WebDriverWait(driver, 5).until(EC.presence_of_element_located101 ((By.CLASS_NAME,102 'org-top-card-summary-info-list__info-item')))103 organisation_type = \104 driver.find_elements_by_class_name('org-top-card-summary-info-list__info-item')[0].text105 if organisation_type in org_types:106 message_member = True107 except TimeoutException as e:108 print('No experience or company does not exist')109 if message_if_org_type_unclear:110 message_member = True111 else:112 print("Already messaged this member, skipping.")113 driver.close()114 # switch to first tab115 driver.switch_to.window(first_tab)116 if message_member:117 member.find_element_by_class_name('message-anywhere-button').click()118 message_text_field = driver.find_element_by_class_name('msg-form__contenteditable')119 message_to_send = message.format(member_full_name=member_full_name,120 member_first_name=member_first_name,121 member_position=member_position,122 current_org=current_org)123 print(f'Messaging {member_full_name} who is {member_position} at {current_org}: {message_to_send}')124 message_text_field.send_keys(message_to_send)125 driver.find_element_by_class_name('msg-form__send-button').click()126 driver.find_element_by_xpath(127 "//button[@data-control-name='overlay.close_conversation_window']").click()128 write_member_url_to_file(member_url)129 try:130 driver.find_element_by_class_name("mlA").click()131 except NoSuchElementException as e:132 print("No confirm discard button")133 except Exception as e:134 print(e)135 ctr += 1136 driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")137 new_members_len = len(driver.find_elements_by_class_name('groups-members-list__typeahead-result'))138 load_iterations = 0139 while new_members_len == len(members) and load_iterations < 100:140 new_members_len = len(driver.find_elements_by_class_name('groups-members-list__typeahead-result'))141 load_iterations += 1142 sleep(0.1)143 if new_members_len == len(members):...
ab.py
Source:ab.py
1"""2Module holds all stuff regarding usage of Apache Benchmark3Copyright 2016 BlazeMeter Inc.4Licensed under the Apache License, Version 2.0 (the "License");5you may not use this file except in compliance with the License.6You may obtain a copy of the License at7 http://www.apache.org/licenses/LICENSE-2.08Unless required by applicable law or agreed to in writing, software9distributed under the License is distributed on an "AS IS" BASIS,10WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.11See the License for the specific language governing permissions and12limitations under the License.13"""14import logging15import re16from math import ceil17from distutils.version import LooseVersion18from bzt import TaurusConfigError19from bzt.engine import ScenarioExecutor, HavingInstallableTools, SelfDiagnosable20from bzt.modules.aggregator import ConsolidatingAggregator, ResultsReader21from bzt.modules.console import WidgetProvider, ExecutorWidget22from bzt.requests_model import HTTPRequest23from bzt.utils import iteritems, CALL_PROBLEMS, shutdown_process, RequiredTool, dehumanize_time, FileReader24class ApacheBenchmarkExecutor(ScenarioExecutor, WidgetProvider, HavingInstallableTools, SelfDiagnosable):25 """26 Apache Benchmark executor module27 """28 def __init__(self):29 super(ApacheBenchmarkExecutor, self).__init__()30 self.log = logging.getLogger('')31 self.process = None32 self._tsv_file = None33 self.tool = None34 self.scenario = None35 def prepare(self):36 super(ApacheBenchmarkExecutor, self).prepare()37 self.scenario = self.get_scenario()38 self.install_required_tools()39 self._tsv_file = self.engine.create_artifact("ab", ".tsv")40 self.stdout = open(self.engine.create_artifact("ab", ".out"), 'w')41 self.stderr = open(self.engine.create_artifact("ab", ".err"), 'w')42 self.reader = TSVDataReader(self._tsv_file, self.log)43 if isinstance(self.engine.aggregator, ConsolidatingAggregator):44 self.engine.aggregator.add_underling(self.reader)45 def get_widget(self):46 """47 Add progress widget to console screen sidebar48 :return:49 """50 if not self.widget:51 label = "%s" % self52 self.widget = ExecutorWidget(self, "ab: " + label.split('/')[1])53 return self.widget54 def __first_http_request(self):55 for request in self.scenario.get_requests():56 if isinstance(request, HTTPRequest):57 return request58 return None59 def startup(self):60 args = [self.tool.tool_path]61 load = self.get_load()62 load_iterations = load.iterations or 163 load_concurrency = load.concurrency or 164 if load.hold:65 hold = int(ceil(dehumanize_time(load.hold)))66 args += ['-t', str(hold)]67 else:68 args += ['-n', str(load_iterations * load_concurrency)] # ab waits for total number of iterations69 timeout = self.get_scenario().get("timeout", None)70 if timeout:71 args += ['-s', str(ceil(dehumanize_time(timeout)))]72 args += ['-c', str(load_concurrency)]73 args += ['-d'] # do not print 'Processed *00 requests' every 100 requests or so74 args += ['-r'] # do not crash on socket level errors75 if self.tool.version and LooseVersion(self.tool.version) >= LooseVersion("2.4.7"):76 args += ['-l'] # accept variable-len responses77 args += ['-g', str(self._tsv_file)] # dump stats to TSV file78 # add global scenario headers79 for key, val in iteritems(self.scenario.get_headers()):80 args += ['-H', "%s: %s" % (key, val)]81 requests = self.scenario.get_requests()82 if not requests:83 raise TaurusConfigError("You must specify at least one request for ab")84 if len(requests) > 1:85 self.log.warning("ab doesn't support multiple requests. Only first one will be used.")86 request = self.__first_http_request()87 if request is None:88 raise TaurusConfigError("ab supports only HTTP requests, while scenario doesn't have any")89 # add request-specific headers90 for key, val in iteritems(request.headers):91 args += ['-H', "%s: %s" % (key, val)]92 if request.method != 'GET':93 raise TaurusConfigError("ab supports only GET requests, but '%s' is found" % request.method)94 if request.priority_option('keepalive', default=True):95 args += ['-k']96 args += [request.url]97 self.reader.setup(load_concurrency, request.label)98 self.process = self._execute(args)99 def check(self):100 ret_code = self.process.poll()101 if ret_code is None:102 return False103 if ret_code != 0:104 self.log.warning("ab tool exited with non-zero code: %s", ret_code)105 return True106 def shutdown(self):107 shutdown_process(self.process, self.log)108 def install_required_tools(self):109 self.tool = self._get_tool(ApacheBenchmark, config=self.settings)110 if not self.tool.check_if_installed():111 self.tool.install()112 def get_error_diagnostics(self):113 diagnostics = []114 if self.stdout is not None:115 with open(self.stdout.name) as fds:116 contents = fds.read().strip()117 if contents.strip():118 diagnostics.append("ab STDOUT:\n" + contents)119 if self.stderr is not None:120 with open(self.stderr.name) as fds:121 contents = fds.read().strip()122 if contents.strip():123 diagnostics.append("ab STDERR:\n" + contents)124 return diagnostics125class TSVDataReader(ResultsReader):126 def __init__(self, filename, parent_logger):127 super(TSVDataReader, self).__init__()128 self.log = parent_logger.getChild(self.__class__.__name__)129 self.file = FileReader(filename=filename, parent_logger=self.log)130 self.skipped_header = False131 self.concurrency = None132 self.url_label = None133 def setup(self, concurrency, url_label):134 self.concurrency = concurrency135 self.url_label = url_label136 return True137 def _read(self, last_pass=False):138 lines = self.file.get_lines(size=1024 * 1024, last_pass=last_pass)139 for line in lines:140 if not self.skipped_header:141 self.skipped_header = True142 continue143 log_vals = [val.strip() for val in line.split('\t')]144 _error = None145 _rstatus = None146 _url = self.url_label147 _concur = self.concurrency148 _tstamp = int(log_vals[1]) # timestamp - moment of request sending149 _con_time = float(log_vals[2]) / 1000.0 # connection time150 _etime = float(log_vals[4]) / 1000.0 # elapsed time151 _latency = float(log_vals[5]) / 1000.0 # latency (aka waittime)152 _bytes = None153 yield _tstamp, _url, _concur, _etime, _con_time, _latency, _rstatus, _error, '', _bytes154class ApacheBenchmark(RequiredTool):155 def __init__(self, config=None, **kwargs):156 settings = config or {}157 tool_path = settings.get('path', 'ab')158 super(ApacheBenchmark, self).__init__(tool_path=tool_path, installable=False, **kwargs)159 def _get_version(self, output):160 version = re.findall("Version\s(\S+)\s", output)161 if not version:162 self.log.warning("%s tool version parsing error: %s", self.tool_name, output)163 else:164 return version[0]165 def check_if_installed(self):166 self.log.debug('Trying %s: %s', self.tool_name, self.tool_path)167 try:168 out, err = self.call([self.tool_path, '-V'])169 except CALL_PROBLEMS as exc:170 self.log.warning("%s check failed: %s", self.tool_name, exc)171 return False172 self.version = self._get_version(out)173 self.log.debug("%s check stdout: %s", self.tool_name, out)174 if err:175 self.log.warning("%s check stderr: %s", self.tool_name, err)...
incremental_reobserve.py
Source:incremental_reobserve.py
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3import itertools4import logging5import os6import random7import sys8import time9from pathlib import Path10from typing import List11import dill12import matplotlib.pyplot as plt13import mayavi.mlab as mlab14import numpy as np15import seaborn as sb16import sympy as sym17from matplotlib import rc, rcParams18from mpl_toolkits.mplot3d import Axes3D19from noise import pnoise1, pnoise220from opensimplex import OpenSimplex21from scipy import stats22from scipy.linalg import block_diag23from scipy.special import gamma as gammafn24from scipy.stats import gamma, multivariate_normal25from tqdm import tqdm26from transformations import *27from stmmap import MessageCounter, RelativeSubmap28from stmmap.utils.plot_utils import PlotConfig29from utils.generate_environments import gen_env_fn_3d, gen_meas_3d30# -------- Plot configurations ---------------------------------------------------------31plt_cfg = PlotConfig()32# -------- Logging configuration -------------------------------------------------------33loggers: List[logging.Logger] = []34loggers += [logging.getLogger(__name__)]35loggers += [logging.getLogger("stmmap")]36level = logging.DEBUG # This could be controlled with --log=DEBUG (I think)37# create formatter and add it to the handlers38FORMAT = "%(asctime)s - %(name)s::%(funcName)s - %(levelname)s - %(message)s"39formatter = logging.Formatter(FORMAT)40ch = logging.StreamHandler()41ch.setLevel(level)42ch.setFormatter(formatter)43for logger in loggers:44 logger.setLevel(level)45 logger.addHandler(ch)46root_logger = loggers[0]47root_logger.debug("Logging initialised")48# -------- Constants and Flags ----------------------------------------------------------49seed = (int)(np.random.rand() * (2.0 ** 30))50# seed =51# seed = 6649082452# seed = 30707211753# seed = 33522088254# seed = 4895687655seed = 86141247756np.random.seed(seed)57random.seed(seed)58print("Seed", seed)59# Measurement constants60# N_z_gen = 5000061# N_updates = 3062# grid_depth = 563N_z_gen = 10 * 4 ** 564N_updates = 5065grid_depth = 566env_type = "perlin"67func = gen_env_fn_3d(env_type, seed=seed)68directory = Path(__file__).parents[0] / "results"69if not os.path.exists(directory):70 os.makedirs(directory)71directory /= "incremental"72directory /= "reobserve"73if not os.path.exists(directory):74 os.makedirs(directory)75load_iterations = True76if load_iterations == False:77 root_logger.debug("Generating new measurements and updating")78 grid = RelativeSubmap(max_depth=grid_depth, tolerance=1e-5)79 normalised_iterations = []80 iterations = []81 # NOTE This is a pseudo update. There is something weird with just the first update. It add more messages than needed I think82 C = 10000 * np.eye(3).reshape(1, 3, 3)83 grid.insert_measurements(np.zeros((1, 3, 1)), C)84 update_iter = grid.update()85 for i in range(N_updates):86 print("Update number:", i)87 # Generate Measurements88 print(i, "------- Generating Measurements -------")89 m_z_stack, C_z_stack = gen_meas_3d(func, N_z_gen, scale=1)90 b = (91 (m_z_stack[:, 0, 0] + m_z_stack[:, 1, 0] <= 1)92 & (m_z_stack[:, 0, 0] > 0)93 & (m_z_stack[:, 0, 0] < 1)94 & (m_z_stack[:, 1, 0] > 0)95 & (m_z_stack[:, 1, 0] < 1)96 )97 m_z_stack = m_z_stack[b, :, :]98 C_z_stack = C_z_stack[b, :, :]99 N_z = m_z_stack.shape[0]100 print(i, "N: total =", N_z_gen, " remainder = ", N_z)101 # Calculate approximation102 print(i, "------- Fusing Measurements -------")103 grid.insert_measurements(m_z_stack, C_z_stack)104 t0 = time.time()105 update_iter: MessageCounter = grid.update()106 normalised_iterations.append(len(update_iter) / N_z)107 iterations.append(len(update_iter))108 t1 = time.time()109 tot = t1 - t0110 print(i, "Approximation runtime:", tot, "s")111 np.savez(open(directory / "iterations.npz", "wb"), iterations, normalised_iterations)112else:113 root_logger.debug("Loading previously calculated results")114 # Load iterations if you want to reuse115 iterations = np.load(open(directory / "iterations.npz", "rb"))["arr_0"]116 normalised_iterations = np.load(open(directory / "iterations.npz", "rb"))["arr_1"]117ratio = 7 / 16118fig_scale = 0.98119fs = (fig_scale * plt_cfg.tex_textwidth, fig_scale * plt_cfg.tex_textwidth * ratio)120fig = plt.figure(figsize=fs, constrained_layout=True)121ax = plt.gca()122# ax_right = ax.twinx()123p1, = ax.plot(iterations, marker=".", color="C0", markersize=4)124# p1, = ax.plot(np.array(iterations) / 4**grid_depth, marker=".", color="C0")125# p2, = ax_right.plot(normalised_iterations, marker=".", color="C1")126ax.set_xlabel("Time step")127ax.set_ylabel("Number of messages passed")128# ax.set_ylabel("Average number of messages passed per surfel")129# ax_right.set_ylabel("Normalised number of messages passed")130ax.ticklabel_format(style="sci", scilimits=(0, 0), axis="y")131# tkw = dict(size=4, width=1.5)132# ax.tick_params(axis="y", colors=p1.get_color(), **tkw)133# ax_right.tick_params(axis="y", colors=p2.get_color(), **tkw)134# base = 10135# ax.set_ylim(0, base * np.ceil(np.max(iterations) * 1.05 / base))136# ax_right.set_ylim(0, base * np.ceil(np.max(normalised_iterations) * 1.05 / base))137# N_surfels = 4**grid_depth138# ln = ax.axhline(N_surfels, color="r", linestyle="-.", label="Number of surfels")139# bgcol = ax.get_facecolor()140# ax.text(141# 0.2,142# N_surfels,143# "Number of surfels",144# ha="center",145# va="center",146# rotation="horizontal",147# backgroundcolor=bgcol,148# transform=ln.get_transform(),149# )150# ax.legend()151# plt.xticks(fontsize=tex_fontsize - 1)152# plt.yticks(fontsize=tex_fontsize - 1)153filename = directory / f"{env_type}_{seed}.pdf"154plt.savefig(filename)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!