Best Python code snippet using locust
assess_quality.py
Source:assess_quality.py
1#!/user/bin/env python2# -*- coding:utf-8 -*-3import requests4import time5import datetime6import logging7import pymysql as mdb8import config as cfg9log_file = 'assess_logger.log'10logging.basicConfig(filename=log_file, level=logging.WARNING)11TEST_ROUND_COUNT = 012def modify_score(ip, success, response_time):13 conn = mdb.connect(cfg.host, cfg.user, cfg.passwd, cfg.DB_NAME)14 cursor = conn.cursor()15 if success == 0:16 logging.warning(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + ip + \17 " out of time")18 try:19 cursor.execute('SELECT * FROM %s WHERE content= "%s"' % (cfg.TABLE_NAME, ip))20 q_result = cursor.fetchall()21 for r in q_result:22 test_times = r[1] + 123 failure_times = r[2]24 success_rate = r[3]25 avg_response_time = r[4]26 # è¶
æ¶è¾¾å°4次ä¸æåçä½äºæ å27 if failure_times > 4 and success_rate < cfg.SUCCESS_RATE:28 cursor.execute('DELETE FROM %s WHERE content= "%s"' % (cfg.TABLE_NAME, ip))29 conn.commit()30 logging.warning(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + ip + \31 " was deleted.")32 else:33 # not too bad34 failure_times += 135 success_rate = 1 - float(failure_times) / test_times36 avg_response_time = (avg_response_time * (test_times - 1) + cfg.TIME_OUT_PENALTY) / test_times37 score = (success_rate + float(test_times) / 500) / avg_response_time38 n = cursor.execute('UPDATE %s SET test_times = %d, failure_times = %d, success_rate = %.2f, avg_response_time = %.2f, score = %.2f WHERE content = "%s"' % (TABLE_NAME, test_times, failure_times, success_rate, avg_response_time, score, ip))39 conn.commit()40 if n:41 logging.error(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + \42 ip + ' has been modify successfully!')43 break44 except Exception as e:45 logging.error(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + \46 'Error when try to delete ' + ip + str(e))47 finally:48 cursor.close()49 conn.close()50 elif success == 1:51 52 try:53 cursor.execute('SELECT * FROM %s WHERE content= "%s"' % (cfg.TABLE_NAME, ip))54 q_result = cursor.fetchall()55 for r in q_result:56 test_times = r[1] + 157 failure_times = r[2]58 avg_response_time = r[4]59 success_rate = 1 - float(failure_times) / test_times60 avg_response_time = (avg_response_time * (test_times - 1) + response_time) / test_times61 score = (success_rate + float(test_times) / 500) / avg_response_time62 n = cursor.execute('UPDATE %s SET test_times = %d, success_rate = %.2f, avg_response_time = %.2f, score = %.2f WHERE content = "%s"' %(cfg.TABLE_NAME, test_times, success_rate, avg_response_time, score, ip))63 conn.commit()64 if n:65 logging.error(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + \66 ip + 'has been modify successfully!')67 break68 except Exception as e:69 logging.error(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + \70 'Error when try to modify ' + ip + str(e))71 finally:72 cursor.close()73 conn.close()74def ip_test(proxies, timeout):75 url = 'https://www.baidu.com'76 for p in proxies:77 proxy = {'http': 'http://'+p}78 try:79 80 start = time.time()81 r = requests.get(url, proxies=proxy, timeout=timeout)82 83 end = time.time()84 85 if r.text is not None:86 logging.warning(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + p + \87 " out of time")88 resp_time = end -start89 modify_score(p, 1, resp_time)90 print 'Database test succeed: '+p+'\t'+str(resp_time)91 except OSError:92 modify_score(p, 0, 0)93def assess():94 global TEST_ROUND_COUNT95 TEST_ROUND_COUNT += 196 logging.warning(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + ">>>>\t" + str(TEST_ROUND_COUNT) + " round!\t<<<<")97 98 conn = mdb.connect(cfg.host, cfg.user, cfg.passwd, cfg.DB_NAME)99 cursor = conn.cursor()100 try:101 cursor.execute('SELECT content FROM %s' % cfg.TABLE_NAME)102 result = cursor.fetchall()103 ip_list = []104 for i in result:105 ip_list.append(i[0])106 if len(ip_list) == 0:107 return108 ip_test(ip_list, cfg.timeout)109 except Exception as e:110 logging.warning(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")+": " + str(e))111 finally:112 cursor.close()113 conn.close()114def main():115 while True:116 assess()117 118 time.sleep(cfg.CHECK_TIME_INTERVAL)119if __name__ == '__main__':...
eval_result.py
Source:eval_result.py
1import re2import sys3import os4MIN_RESPONSE_TIME = int(os.environ['min_response_time'])5MAX_RESPONSE_TIME = int(os.environ['max_response_time'])6AVG_RESPONSE_TIME = int(os.environ['avg_response_time'])7print("\n---- Test Parameters -----\n")8print("MIN_RESPONSE_TIME: %d" % MIN_RESPONSE_TIME)9print("MAX_RESPONSE_TIME: %d" % MAX_RESPONSE_TIME)10print("AVG_RESPONSE_TIME: %d" % AVG_RESPONSE_TIME)11print("\n\n---- Test Result ----\n")12f = open("load_test_result_requests.csv", "r")13lines = f.readlines()14stats = []15for l in lines:16 match = re.search('\"(GET|POST)\",\"\/.*\"((,\d*){6})', l)17 try:18 stats.append(match.group(2)[1:].split(","))19 except:20 pass21(num_requests, num_failures, med_response_time, avg_response_time, min_response_time, max_response_time) = (list(x) for x in zip(*stats))22total_num_requests = sum(map(int, num_requests))23[total_num_requests, total_num_failures, total_med_response_time, total_avg_response_time, total_min_response_time, total_max_response_time] = lines[-1].split(",")[2:8]24fw = open("total_result1.csv", "w")25fw.write("Median,AVG,MIN,MAX\n")26fw.write("%s,%s,%s,%s\n" % (total_med_response_time, total_avg_response_time, total_min_response_time, total_max_response_time))27fw.close()28fw = open("total_result2.csv", "w")29fw.write("Total,Fails\n")30fw.write("%s,%s\n" % (total_num_requests, total_num_failures))31fw.close()32if total_num_requests == 0:33 sys.exit("There is no reqest, something goes wrong!")34else:35 print("num_requests: PASSED")36if not all(int(i) == 0 for i in num_failures):37 sys.exit("There are some requests failed")38else:39 print("num_failures: PASSED")40if not all(int(i) <= MIN_RESPONSE_TIME for i in min_response_time):41 sys.exit("There are some min_response_time bigger than %d ms\n MIN_RESPONSE_TIME: %s" % (MIN_RESPONSE_TIME, min_response_time))42else:43 print("min_response_time: PASSED")44if not all(int(i) <= MAX_RESPONSE_TIME for i in max_response_time):45 sys.exit("There are some max_response_time bigger than %d ms\n MAX_RESPONSE_TIME: %s" % (MAX_RESPONSE_TIME, max_response_time))46else:47 print("max_response_time: PASSED")48if not all(int(i) <= AVG_RESPONSE_TIME for i in avg_response_time):49 sys.exit("There are some avg_response_time bigger than %d ms\n AVG_RESPONSE_TIME: %s" % (AVG_RESPONSE_TIME, avg_response_time))50else:...
speed_test.py
Source:speed_test.py
1import subprocess2import os3def get_avg_ping(host):4 _num_pings = 55 total_response_time = 06 host = host.replace("https", "").replace("http", "").replace("://", "")7 p2 = {"nt": "-n", "posix": "-c"}8 ping = subprocess.Popen(["ping", p2[os.name], str(_num_pings), host], stdout=subprocess.PIPE)9 for line in ping.stdout.readlines():10 line_as_string = line.decode("utf-8")11 if "time=" in line_as_string and "ms" in line_as_string:12 response_time = line_as_string.split("time=")[1].split("ms")[0]13 total_response_time += float(response_time)14 elif "Time out" in line_as_string:15 total_response_time += 100016 elif "could not find host" in line_as_string:17 total_response_time += 9999 * _num_pings18 ping.stdout.close()19 if total_response_time == 0:20 return 9999921 avg_response_time = float(total_response_time / _num_pings)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!