Best Python code snippet using lettuce_webdriver_python
check.py
Source:check.py
...12 if ssh_userhost is not None:13 message = '{}<click>autoterm -e ssh {}</click>'.format(message, ssh_userhost)14 print('<txt><span foreground="#FF7777">{}</span></txt>'.format(message))15 exit(0)16def check_alert(check_func, ssh_userhost=None):17 """18 Check if alert is needed19 """20 (is_ok, errors) = check_func()21 if not is_ok:22 alert(errors, ssh_userhost)23def ping_check(host):24 """25 Check if host is up26 """27 ping_cmd = 'ping -c 1 {}'.format(host)28 (exitcode, output) = subprocess.getstatusoutput(ping_cmd)29 if exitcode != 0:30 return (False, 'Failed to ping {}'.format(host))31 return (True, None)32def check_server(ssh_userhost):33 """34 Check if server is up and running using OpenSSH client35 """36 host = ssh_userhost.split('@')[1]37 ssh_cmd = 'ssh {} python3 - < {}/print_stats.py'.format(ssh_userhost, os.path.dirname(os.path.realpath(__file__)))38 (exitcode, output) = subprocess.getstatusoutput(ssh_cmd)39 if exitcode != 0:40 return (False, '{}: Failed to fetch stats'.format(host))41 errors = []42 status = json.loads(output)43 if type(status) != dict:44 return (False, '{}: Failed to parse stats ({} returned)'.format(host, type(status)))45 for partition in status['df']:46 kbytes_avail = status['df'][partition][0]47 kbytes_total = status['df'][partition][1]48 if kbytes_avail < 1024*1024*1024*5 or kbytes_avail < kbytes_total * 0.1:49 errors.append('{} {} has just {:.2f} GiB bytes left'.format(host, partition, status['df'][partition][0] / 1024 / 1024 / 1024))50 if status['la'][2] > 0.5:51 errors.append('{} la is {:.2f}'.format(host, status['la'][2]))52 if status['avail_mem'] < 0.25:53 errors.append('{} free memory is {:.2f}%'.format(host, status['avail_mem'] * 100))54 if status['free_swap'] < 0.5:55 errors.append('{} free swap is {:.2f}%'.format(host, status['free_swap'] * 100))56 return (len(errors) == 0, ", ".join(errors))57def urlopen(url):58 req = urllib.request.Request(59 url,60 data=None,61 headers={62 'User-Agent': 'UptimeRobot/1.0'63 }64 )65 return urllib.request.urlopen(req)66def check_http_ok(url):67 """68 Check if HTTP request returns 200 OK69 """70 try:71 with urlopen(url) as response:72 if response.code != 200:73 return (False, '{}: HTTP request failed with code {}'.format(url, response.code))74 contents = response.read()75 if contents != b'OK':76 return (False, '{}: {}'.format(url, contents))77 return (True, None)78 except Exception as e:79 return (False, 'Failed to check {} ({})'.format(url, e))80def check_http_contains(url, text):81 """82 Check if HTTP request returns 200 OK83 """84 try:85 with urlopen(url) as response:86 if response.code != 200:87 return (False, '{}: HTTP request failed with code {}'.format(url, response.code))88 contents = response.read()89 if contents.decode('utf-8').find(text) == -1:90 return (False, '{}: "{}" not found'.format(url, text))91 return (True, None)92 except Exception as e:93 return (False, 'Failed to check {} ({})'.format(url, e))94def check_dynadot_expiring_domains(api_key, ditch_domains):95 """96 Check if Dynadot has expiring domains97 """98 url = 'https://api.dynadot.com/api3.xml?key={}&command=list_domain'.format(api_key)99 try:100 with urlopen(url) as response:101 if response.code != 200:102 return (False, 'Dynadot HTTP request failed with code {}'.format(response.code))103 # create element tree object104 tree = ET.parse(response)105 # get root element106 root = tree.getroot()107 errors = []108 domain_items = root.findall('ListDomainInfoContent/DomainInfoList/DomainInfo/Domain')109 for domain_item in domain_items:110 name = domain_item.find('Name').text111 if name in ditch_domains:112 continue113 expiry_days = int((int(domain_item.find('Expiration').text) / 1000 - int(time.time())) / (60 * 60 * 24))114 if expiry_days < 0:115 errors.append('Dynadot domain {} expired {} days ago'.format(name, -expiry_days))116 elif expiry_days < 183:117 errors.append('Dynadot domain {} is expiring in {} days'.format(name, expiry_days))118 return (len(errors) == 0, ", ".join(errors))119 except Exception as e:120 return (False, 'Failed to check {} ({})'.format(url, e))121conf_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conf.json')122conf = json.load(open(conf_path))123check_alert(lambda: ping_check('1.1.1.1'))124if 'dynadot' in conf:125 dynadot_conf = conf['dynadot']126 if 'apiKey' in dynadot_conf:127 if 'ditchDomains' in dynadot_conf:128 ditch = dynadot_conf['ditchDomains']129 else:130 ditch = []131 check_alert(lambda: check_dynadot_expiring_domains(dynadot_conf['apiKey'], ditch))132for server in conf['sshServers']:133 check_alert(lambda: check_server(server), server)134for url, ssh_userhost in conf['httpExpectOk'].items():135 check_alert(lambda: check_http_ok(url), ssh_userhost)136for url, text in conf['httpFind'].items():137 check_alert(lambda: check_http_contains(url, text))...
nodes_diagnostic.py
Source:nodes_diagnostic.py
1class NodesDiagnostic:2 def __init__(self, logger):3 self.logger = logger4 def check_alert(self, actual_value, total_value, warn_value,5 critical_value, message):6 percent_value = round((actual_value * 100) / total_value, 2)7 if(percent_value > warn_value and8 percent_value < critical_value):9 message += ' : {}% - Warning\r\n'.format(percent_value)10 elif(percent_value > critical_value):11 message += ' : {}% - Critical\r\n'.format(percent_value)12 else:13 message += ' : {}% - OK\r\n'.format(percent_value)14 return message15 def alert_file_description(self, node, conditions):16 fd_used = node['fd_used']17 fd_total = node['fd_total']18 fd_result = 'File Descriptors Alert'19 return self.check_alert(fd_used, fd_total,20 conditions['file_descriptors_used_percent_warn'],21 conditions['file_descriptors_used_percent_critical'],22 fd_result)23 def alert_files_description_as_sockets(self, node, conditions):24 sd_used = node['sockets_used']25 sd_total = node['sockets_total']26 sd_result = 'File Descriptors as Sockets Alert'27 return self.check_alert(sd_used, sd_total,28 conditions['file_descriptors_used_as_sockets_percent_warn'],29 conditions['file_descriptors_used_as_sockets_percent_critical'],30 sd_result)31 def alert_disk_free(self, node, conditions):32 disk_free = node['disk_free']33 disk_free_limit = node['disk_free_limit']34 message = 'Disk Free Limit is {} actual value is {}: '.format(disk_free_limit, disk_free)35 disk_free_limit_critical = disk_free_limit / 236 if(disk_free < disk_free_limit and37 disk_free > disk_free_limit_critical):38 message += ' Warning\r\n'39 elif(disk_free < disk_free_limit_critical):40 message += ' Critial\r\n'41 else:42 message += ' OK\r\n'43 return message44 def alert_mem_free(self, node, conditions):45 mem_used = node['mem_used']46 mem_limit = node['mem_limit']47 mem_result = 'Mem Free Alert'48 return self.check_alert(mem_used, mem_limit,49 conditions['memory_used_percent_warn'],50 conditions['memory_used_percent_critical'],51 mem_result)52 def alert_erlang_process(self, node, conditions):53 proc_used = node['proc_used']54 proc_total = node['proc_total']55 proc_result = 'Erlang processes used Alert '56 return self.check_alert(proc_used, proc_total,57 conditions['erlang_process_percent_warn'],58 conditions['erlang_process_percent_critical'],...
test.py
Source:test.py
1import requests2import json3serverToken = 'AAAAO5T7NUo:APA91bFj3mgtfeVJUp8OM7AqVOXhvPMTjbWJpncUnD2f3jXEkxDXh3F5eBqRvxTMFyUopxQOa-kcALIiNNrjdXN4RqZzuxTF0qXhgw2OnJcTqVWPtT_qVcV_1Cbj02rWHff0w3Y90O-5'4deviceToken = 'dMYdDuEwMFtJ58w_L42fv-:APA91bESoIFbnmG-6bkLN6a0VFqHCMhQ8YPndtAz1snjTpoGnTKjFkrPBr9K6pxck3uJDkZzkHjVXgOTZFITZ9alixSPelkIoiCfTeQkvoa4uXDnl88ssWvbko92Q-MvIzGZVSffaA9z'5headers = {6 'Content-Type': 'application/json',7 'Authorization': 'key=' + serverToken,8 }9body = {10 'notification': {'title': 'Alert Hight Risk',11 'body': 'A Person Entered the Store without Mask.',12 },13 "to": "/topics/Alert",14 'priority': 'high',15 # 'data': dataPayLoad,16 }17def send_alert():18 response = requests.post("https://fcm.googleapis.com/fcm/send",headers = headers, data=json.dumps(body))19 print(response.status_code)20 print(response.json())21import cv222import numpy as np23from tensorflow.keras.models import load_model24from time import time25model=load_model("./model2-019.model")26results={0:'without mask',1:'mask'}27GR_dict={0:(0,0,255),1:(0,255,0)}28rect_size = 429cap = cv2.VideoCapture(1) 30haarcascade = cv2.CascadeClassifier('/Users/vigneshkkar/opt/anaconda3/lib/python3.8/site-packages/cv2/data/haarcascade_frontalface_alt.xml')31check_alert = False32alert_sent = False33time_diff = time()34while True:35 (rval, im) = cap.read()36 im=cv2.flip(im,1,1) 37 38 rerect_size = cv2.resize(im, (im.shape[1] // rect_size, im.shape[0] // rect_size))39 faces = haarcascade.detectMultiScale(rerect_size)40 for f in faces:41 (x, y, w, h) = [v * rect_size for v in f] 42 43 face_img = im[y:y+h, x:x+w]44 rerect_sized=cv2.resize(face_img,(150,150))45 normalized=rerect_sized/255.046 reshaped=np.reshape(normalized,(1,150,150,3))47 reshaped = np.vstack([reshaped])48 result=model.predict(reshaped)49 50 label=np.argmax(result,axis=1)[0]51 52 cv2.rectangle(im,(x,y),(x+w,y+h),GR_dict[label],2)53 cv2.rectangle(im,(x,y-40),(x+w,y),GR_dict[label],-1)54 cv2.putText(im, results[label], (x, y-10),cv2.FONT_HERSHEY_SIMPLEX,0.8,(255,255,255),2)55 56 if(label == 1):57 check_alert = False58 alert_sent = False59 if(label == 0 and check_alert == False):60 check_alert = True61 time_diff = time()62 if(int(time() - time_diff) > 5 and check_alert and alert_sent == False):63 send_alert()64 check_alert = False65 alert_sent = True66 67 cv2.imshow('Face Mask Detection', im)68 key = cv2.waitKey(10)69 70 if key == 27: 71 break72cap.release()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!