Best Python code snippet using Airtest
main.py
Source:main.py
1import datetime2import json3import logging4import os5import sqlite36import sys7import time8import smtplib9from venv import create10import bottle11import dateparser12import yaml13logging.basicConfig(level="DEBUG")14SCRIPT_DIR = os.path.abspath(os.path.dirname(__file__))15CONFIG_DIR = os.environ.get("TIMESERIES_SERVER_CONFIG_DIR", SCRIPT_DIR + "/config")16DATA_DIR = os.environ.get("TIMESERIES_SERVER_DATA_DIR", SCRIPT_DIR + "/data")17COLLECTION_SERVER_SYMMETRICAL_KEY = os.environ.get(18 "COLLECTION_SERVER_SYMMETRICAL_KEY", ""19)20COLLECTION_SERVER_PORT = int(os.environ.get("COLLECTION_SERVER_PORT", 8080))21COLLECTION_SERVER_UI_PORT = int(os.environ.get("COLLECTION_SERVER_UI_PORT", 8081))22DETECTOR_EMAIL_TO = os.environ.get("DETECTOR_EMAIL_TO", "")23DETECTOR_EMAIL_FROM = os.environ.get("DETECTOR_EMAIL_FROM", "")24DETECTOR_EMAIL_SMTP = os.environ.get("DETECTOR_EMAIL_SMTP", "")25DETECTOR_EMAIL_USERNAME = os.environ.get("DETECTOR_EMAIL_USERNAME", "")26DETECTOR_EMAIL_PASSWORD = os.environ.get("DETECTOR_EMAIL_PASSWORD", "")27DATABASE_SCHEMA = [28 "CREATE TABLE IF NOT EXISTS timeseries_log (id INTEGER PRIMARY KEY, created_at, time, entity, key, value REAL)", # value is the numerical thing tested29 "CREATE TABLE IF NOT EXISTS events_log (id INTEGER PRIMARY KEY, created_at, time, detector_name, value INTEGER, desc)", # value is on or off30 "CREATE TABLE IF NOT EXISTS recent_alerts (id INTEGER PRIMARY KEY, created_at, updated_at, time, detector_name, value INTEGER, desc)", # value is active or not31]32db = sqlite3.connect(DATA_DIR + "/db.sqlite3")33[db.execute(x) for x in DATABASE_SCHEMA]34COMMANDS = [35 # "PRAGMA cache_size=256000;", # 0.25GB in RAM36 "PRAGMA synchronous=OFF;",37 "PRAGMA count_changes=OFF;",38 "PRAGMA temp_store=MEMORY;",39]40for cmd in COMMANDS:41 db.execute(cmd)42def run_collection_server():43 bottle.BaseRequest.MEMFILE_MAX = bottle.BaseRequest.MEMFILE_MAX * 10044 # accept data to one path with a hmac header accepting a json array of objects with time (optional), entity, key, value45 @bottle.post("/timeseries")46 def timeseries():47 try:48 db = sqlite3.connect(DATA_DIR + "/db.sqlite3")49 COMMANDS = [50 # "PRAGMA cache_size=256000;", # 0.25GB in RAM51 "PRAGMA synchronous=OFF;",52 "PRAGMA count_changes=OFF;",53 "PRAGMA temp_store=MEMORY;",54 ]55 for cmd in COMMANDS:56 db.execute(cmd)57 logging.debug("input data: %s", bottle.request.body.read())58 json_data = bottle.request.forms.get("data")59 data = json.loads(json_data)60 _times = [61 datetime.datetime.fromtimestamp(x["time"]).isoformat() for x in data62 ]63 _entities = [str(x["entity"]) for x in data]64 _keys = [str(x["key"]) for x in data]65 _values = [float(x["value"]) for x in data]66 logging.info("timeseries input of length %d", len(data))67 created_at = datetime.datetime.now().isoformat()68 db.executemany(69 "insert into timeseries_log (created_at, time, entity, key, value) values ('%s', ?,?,?,?)"70 % created_at,71 zip(_times, _entities, _keys, _values),72 )73 db.commit()74 logging.info("committed successfully")75 return {"status": "ok"}76 except Exception as e:77 logging.exception("error in /timeseries with error %s", repr(e))78 raise79 bottle.run(host="0.0.0.0", port=COLLECTION_SERVER_PORT, server="paste")80def run_ui_server():81 # needs a landing page that accepts queries in sql and produces a table from the data82 @bottle.get("/")83 def root():84 return """\85<h1>menu</h1>86 <ul>87 <li><a href="/timeseries_log">timeseries log</a></li>88 <li><a href="/events_log">events log</a></li>89 <li><a href="/recent_alerts">recent alerts</a></li>90 </ul> 91"""92 @bottle.get("/timeseries_log")93 def timeseries_log():94 db = sqlite3.connect(DATA_DIR + "/db.sqlite3")95 rows = db.execute("select * from timeseries_log").fetchall()96 cols = rows[0] if rows else []97 return (98 """\99<h1>timeseries log</h1>100 <ul>101 <li><a href="/">back</a></li>102 </ul> 103<br>104<table border="1">105<tr>"""106 + "".join(f"<th>{x}</th>" for x in cols)107 + """</tr>108"""109 + "".join(110 "<tr>" + "".join(f"<td>{cell}</td>" for cell in row) + "</tr>"111 for row in rows112 )113 + """114</table>115"""116 )117 @bottle.get("/events_log")118 def events_log():119 db = sqlite3.connect(DATA_DIR + "/db.sqlite3")120 rows = db.execute("select * from events_log").fetchall()121 cols = rows[0] if rows else []122 return (123 """\124<h1>events log</h1>125 <ul>126 <li><a href="/">back</a></li>127 </ul> 128<br>129<table border="1">130<tr>"""131 + "".join(f"<th>{x}</th>" for x in cols)132 + """</tr>133"""134 + "".join(135 "<tr>" + "".join(f"<td>{cell}</td>" for cell in row) + "</tr>"136 for row in rows137 )138 + """139</table>140"""141 )142 @bottle.get("/recent_alerts")143 def recent_alerts():144 db = sqlite3.connect(DATA_DIR + "/db.sqlite3")145 rows = db.execute("select * from recent_alerts").fetchall()146 cols = rows[0] if rows else []147 return (148 """\149<h1>recent alerts</h1>150 <ul>151 <li><a href="/">back</a></li>152 </ul> 153<br>154<table border="1">155<tr>"""156 + "".join(f"<th>{x}</th>" for x in cols)157 + """</tr>158"""159 + "".join(160 "<tr>" + "".join(f"<td>{cell}</td>" for cell in row) + "</tr>"161 for row in rows162 )163 + """164</table>165"""166 )167 bottle.run(host="0.0.0.0", port=COLLECTION_SERVER_UI_PORT, server="paste")168def run_detectors():169 with open(CONFIG_DIR + "/config.yaml") as stream:170 try:171 parsed_yaml = yaml.safe_load(stream)172 except yaml.YAMLError as exc:173 logging.exception(174 "error in run_detectors while parsing yaml config with error %s", exc175 )176 raise177 # send notification by email178 def _send_notification_by_email(event):179 detector_email_body = """\180Detector: %s181Desc: %s 182""" % (183 event[0],184 event[1],185 )186 subject = "Alert: %s" % event[0]187 server = smtplib.SMTP_SSL(DETECTOR_EMAIL_SMTP, 465)188 server.ehlo()189 server.login(DETECTOR_EMAIL_USERNAME, DETECTOR_EMAIL_PASSWORD)190 BODY = "\r\n".join(191 [192 "To: %s" % DETECTOR_EMAIL_TO,193 "From: %s" % DETECTOR_EMAIL_FROM,194 "Subject: %s" % subject,195 "",196 detector_email_body,197 ]198 )199 try:200 server.sendmail(DETECTOR_EMAIL_FROM, [DETECTOR_EMAIL_TO], BODY)201 logging.debug("email sent")202 except Exception as e:203 logging.exception("error sending mail with error %s", repr(e))204 server.quit()205 # _send_notification_by_email()206 # periodically action dead timers by looking in time series for an event key pair that haven't arrived in detector dead interval207 # detector needs a lookback time, a threshold for lower than or more than, and a percent of datapoints, and a dead detector time208 # rows = db.execute("select * from timeseries_log limit 100").fetchall()209 # configuration example: entity: str, key: str, threshold: float, lookback period: str, percent:float, dead_detector_seconds: int210 # inReal type: datetime.datetime and value (should be sorted)211 #212 def _more_than_expected(213 inReal, optInTimePeriodString, threshold, fraction, dead_detector_seconds214 ):215 optInTimeBegin = dateparser.parse(optInTimePeriodString)216 logging.debug("optInTimeBegin is %s", optInTimeBegin)217 optInTimeEnd = datetime.datetime.now()218 more_than_acc = []219 less_than_acc = []220 for date, value in inReal:221 if datetime.datetime.fromtimestamp(date) < optInTimeBegin:222 continue223 # if date > optInTimeEnd:224 # continue225 if value >= threshold:226 more_than_acc.append(value)227 else:228 less_than_acc.append(value)229 # dead detector230 if (231 inReal232 and inReal[0]233 and datetime.datetime.fromtimestamp(inReal[0][0])234 + datetime.timedelta(seconds=dead_detector_seconds)235 <= optInTimeEnd236 ):237 return (238 True,239 "Dead Detector: Last timestamp was %s beyond the dead detector of %d seconds. Actual seconds %d."240 % (241 datetime.datetime.fromtimestamp(inReal[0][0]),242 dead_detector_seconds,243 optInTimeEnd.timestamp() - inReal[0][0],244 ),245 )246 total_samples = len(more_than_acc) + len(less_than_acc)247 if total_samples and (len(more_than_acc) / total_samples) >= fraction:248 return (249 True,250 "Threshold Detector: %d of the last %d samples against threshold fraction %f were greater than threshold of %f."251 % (len(more_than_acc), total_samples, fraction, threshold),252 )253 return (254 False,255 "NO THRESHOLD MET: Detector would read: Threshold Detector: %d of the last %d samples against threshold fraction %f were greater than threshold of %f."256 % (len(more_than_acc), total_samples, fraction, threshold),257 )258 more_than_expected = lambda entity, key, optInTimePeriodString, threshold, fraction, dead_detector_seconds: _more_than_expected(259 inReal=db.execute(260 "select time, value from timeseries_log where entity=? and key=? order by id desc limit 100",261 (entity, key),262 ).fetchall(),263 optInTimePeriodString=optInTimePeriodString,264 threshold=threshold,265 fraction=fraction,266 dead_detector_seconds=dead_detector_seconds,267 )268 def _less_than_expected(269 inReal, optInTimePeriodString, threshold, fraction, dead_detector_seconds270 ):271 optInTimeBegin = dateparser.parse(optInTimePeriodString)272 logging.debug("optInTimeBegin is %s", optInTimeBegin)273 optInTimeEnd = datetime.datetime.now()274 more_than_acc = []275 less_than_acc = []276 for date, value in inReal:277 if datetime.datetime.fromtimestamp(date) < optInTimeBegin:278 continue279 # if date > optInTimeEnd:280 # continue281 if value < threshold:282 more_than_acc.append(value)283 else:284 less_than_acc.append(value)285 # dead detector286 if (287 inReal288 and inReal[0]289 and datetime.datetime.fromtimestamp(inReal[0][0])290 + datetime.timedelta(seconds=dead_detector_seconds)291 <= optInTimeEnd292 ):293 return (294 True,295 "Dead Detector: Last timestamp was %s beyond the dead detector of %d seconds. Actual seconds %d."296 % (297 datetime.datetime.fromtimestamp(inReal[0][0]),298 dead_detector_seconds,299 optInTimeEnd.timestamp() - inReal[0][0],300 ),301 )302 total_samples = len(more_than_acc) + len(less_than_acc)303 if total_samples and (len(less_than_acc) / total_samples) >= fraction:304 return (305 True,306 "Threshold Detector: %d of the last %d samples against threshold fraction %f were less than threshold of %f."307 % (len(less_than_acc), total_samples, fraction, threshold),308 )309 return (310 False,311 "NO THRESHOLD MET: Detector would read: Threshold Detector: %d of the last %d samples against threshold fraction %f were less than threshold of %f."312 % (len(more_than_acc), total_samples, fraction, threshold),313 )314 less_than_expected = lambda entity, key, optInTimePeriodString, threshold, fraction, dead_detector_seconds: _less_than_expected(315 inReal=db.execute(316 "select time, value from timeseries_log where entity=? and key=? order by id desc limit 100",317 (entity, key),318 ).fetchall(),319 optInTimePeriodString=optInTimePeriodString,320 threshold=threshold,321 fraction=fraction,322 dead_detector_seconds=dead_detector_seconds,323 )324 detector_map = {325 "more_than_expected": more_than_expected,326 "less_than_expected": less_than_expected,327 }328 events = []329 for detector in parsed_yaml:330 is_alert, desc = detector_map[detector["function"]](331 entity=detector["entity"],332 key=detector["key"],333 optInTimePeriodString=detector["optInTimePeriodString"],334 threshold=detector["threshold"],335 fraction=detector["fraction"],336 dead_detector_seconds=detector["dead_detector_seconds"],337 )338 alarmUniqueId = "%s!!!%s!!!%s" % (339 detector["name"],340 detector["entity"],341 detector["key"],342 )343 events.append([alarmUniqueId, desc])344 logging.debug(345 "detector with uniqueId %s result: %s, %s", alarmUniqueId, is_alert, desc346 )347 # add all events regardless of whether they have been added before # i think this should be a log348 created_at = datetime.datetime.now().isoformat()349 db.executemany(350 "insert into events_log (created_at, time, detector_name, value, desc) values ('%s','%s',?,1,?)"351 % (created_at, created_at),352 events[:2],353 )354 db.commit()355 # process alerts to see current status356 alert_exists = []357 for event in events:358 alert_id, desc = event359 res = db.execute(360 "select id from recent_alerts where detector_name=? limit 1", (alert_id,)361 ).fetchall()362 alert_exists.append(True if res else False)363 # insert those that dont exist364 for event, exist in zip(events, alert_exists):365 if not exist:366 db.execute(367 "insert into recent_alerts (created_at, updated_at, time, detector_name, desc) values ('%s','%s','%s',?,?)"368 % (created_at, created_at, created_at),369 tuple(event),370 )371 _send_notification_by_email(event)372 db.commit()373 # get those that are closed now374 closed_alerts = db.execute(375 "select id, detector_name from recent_alerts where%s"376 % "AND".join(" detector_name!=? " for e in events),377 tuple(x[0] for x in events),378 ).fetchall()379 for closed_alert in closed_alerts:380 db.execute(381 "insert into events_log (created_at, time, detector_name, value INTEGER) values ('%s','%s',?,0)"382 % (created_at, created_at),383 (closed_alert[1],),384 )385 db.commit()386 if closed_alerts:387 db.execute(388 "delete from recent_alerts where%s"389 % "AND".join(" detector_name=? " for e in closed_alerts),390 tuple(e[0] for e in closed_alerts),391 )392 db.commit()393def main(argv):394 try:395 args = argv[1:].copy()396 logging.debug("args: %s", args)397 while len(args):398 arg = args.pop(0)399 command = arg400 command_arg = args[0] if len(args) > 0 else ""401 if command == "run_collection_server":402 run_collection_server()403 return404 elif command == "run_ui_server":405 run_ui_server()406 return407 elif command == "run_detectors":408 while True:409 run_detectors()410 time.sleep(15 * 60)411 # return412 else:413 help()414 return415 except (KeyboardInterrupt, SystemExit) as e:416 logging.info("exiting normally with %s", repr(e))417 except Exception as e:418 logging.exception("exiting abnormally with error %s", repr(e))419 return 1420 return 0421def help():422 logging.info(423 """424timeseries target server, detector framework, and notification publishing425Options:426 run_collection_server Start server providing the API target for collecting timeseries427 run_ui_server Start human interface for checking graphs428 run_detectors Run the detectors over the dataset429"""430 )431if __name__ == "__main__":...
coursera.py
Source:coursera.py
...35 with credentials_yaml.open(mode='r') as credentials_file:36 credentials = yaml.load(37 credentials_file, Loader=yaml.FullLoader)['coursera']38 return (credentials['username'], credentials['password'])39def alert_exists(driver):40 try:41 driver.switch_to.alert42 return True43 except NoAlertPresentException:44 return False45@retry(retry=retry_if_exception_type((46 TimeoutException,47 UnexpectedAlertPresentException,48 NoSuchElementException,49 StaleElementReferenceException)), stop=stop_after_attempt(5))50def solve_recaptcha(driver):51 """52 :param driver: A selenium `webdriver` object53 :return: True if solved, False if not54 """55 # Switch to the recaptcha iframe56 driver.switch_to.default_content()57 captcha_frame = driver.find_element_by_css_selector(58 'iframe[src^="https://www.google.com/recaptcha/api2/bframe"]')59 driver.switch_to.frame(captcha_frame)60 # Click the audio button61 if element_exists(driver, By.ID, 'recaptcha-audio-button'):62 audio_button = driver.find_element_by_id('recaptcha-audio-button')63 long_click(driver, audio_button)64 time.sleep(5)65 try:66 # Wait for the solve button and click it67 solve_button = WebDriverWait(driver, 10).until(68 EC.presence_of_element_located((By.ID, 'solver-button')))69 time.sleep(1)70 long_click(driver, solve_button)71 time.sleep(20)72 # Accept the alert that might pop up73 if alert_exists(driver):74 driver.switch_to.alert.accept()75 except TimeoutException:76 warnings.warn('Buster\'s solve button did not appear, retrying...')77 raise78 except UnexpectedAlertPresentException:79 warnings.warn(80 'Some alert popped up, accepting that and retrying...')81 driver.switch_to.alert.accept()82 raise83 if not element_exists(driver, By.ID, 'logout-btn'):84 raise NoSuchElementException85 return True86def long_click(driver, element):87 """Clicks an `element` for a random amount of time...
exabeam.py
Source:exabeam.py
...44 report_exception()45 # create alert for all notable sessions46 for session in notable_sessions.values():47 # skip sessions we have already alerted48 if self.alert_exists(session):49 continue50 # get observables from details51 observables = []52 observables.append({"type":F_USER, "value":session["user"]})53 observables.append({"type":F_EXABEAM_SESSION, "value":session["id"]})54 # create alert submission55 submission = Submission(56 description = f"Exabeam Session - {session['id']} - {session['risk']:.0f}",57 analysis_mode = ANALYSIS_MODE_CORRELATION,58 tool = 'exabeam',59 tool_instance = saq.CONFIG['exabeam']['base_uri'],60 type = ANALYSIS_TYPE_EXABEAM,61 event_time = datetime.datetime.now(),62 details = session,63 observables = observables,64 tags = [],65 queue = saq.CONFIG['service_exabeam_collector']['queue'],66 files = [])67 # submit alert68 self.queue_submission(submission)69 def alert_exists(self, session):70 with get_db_connection('ace') as db:71 c = db.cursor()72 c.execute("SELECT count(*) FROM alerts WHERE description like %s", f"Exabeam Session - {session['id']} - %")73 row = c.fetchone() 74 count = int(row[0])75 if count > 0:76 logging.debug(f"Alert for {session['id']} already exists")77 return True78 logging.debug(f"Alert for {session['id']} does not exist, creating...")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!