Best Python code snippet using stestr_python
controller_outdated.py
Source:controller_outdated.py
1#import threading2import time3import numpy as np 4from Queue import Queue5from Queue import Empty6from Queue import Full7from audio_receiver import Audio_Receiver8from id_receiver import Id_Receiver9from speaker import Speaker10from visualizer import Visualizer11 12class Controller:13 14 def __init__(self, visualization=True):15 self.id_to_main_queue = Queue(maxsize=50) 16 self.audio_to_main_queue = Queue(maxsize=1000)17 self.id_recv = Id_Receiver(self.id_to_main_queue)18 self.audio_recv = Audio_Receiver(self.audio_to_main_queue)19 20 self.visualization = visualization21 if self.visualization:22 self.main_to_vis_queue = Queue(maxsize=50)23 self.visualizer = Visualizer(self.main_to_vis_queue)24 25 self.speakers = []26 self.num_speakers = 027 28 def run(self):29 self.id_recv.start()30 self.audio_recv.start()31 32 if self.visualization:33 self.visualizer.start()34 35 recording_id_odas = [0, 0, 0, 0]36 recording_id_us = [0, 0, 0, 0]37 last_recording_id_us = [0, 0, 0, 0]38 39 40 while self.id_recv.is_alive() and self.audio_recv.is_alive():41 42 #wait for new audio data43 latest_audio = self.audio_to_main_queue.get(block=True)44 45 audio_recording_buffer = [np.empty(0, dtype=np.int16), np.empty(0, dtype=np.int16), np.empty(0, dtype=np.int16), np.empty(0, dtype=np.int16)]46 47 #get the latest id update (empties queue and save the last one)48 while 1:49 try:50 latest_id = self.id_to_main_queue.get(block=False) 51 except Empty:52 break53 54 if self.visualization:55 current_speakers = []56 57 58 #this part filters the odas source tracking and assigs our filtered speaker id to each odas source59 for i in range(len(latest_id['src'])): #len=460 #{ "id": 53, "tag": "dynamic", "x": -0.828, "y": -0.196, "z": 0.525, "activity": 0.926 } 61 recording_id_odas[i] = latest_id['src'][i]['id']62 recording_id_us[i] = 0 #clear our ids63 if recording_id_odas[i] > 0:64 thispos = [latest_id['src'][i]['x'], latest_id['src'][i]['y'], latest_id['src'][i]['z']]65 #check if we know this speaker id, and if yes update our speaker66 found_matching_speaker = False67 for speaker in self.speakers:68 if speaker.current_odas_id == recording_id_odas[i]:69 recording_id_us[i] = speaker.id70 speaker.pos = thispos71 found_matching_speaker = True72 break73 74 #TODO: if to speakers drifted close together and became inactive, how to decide, which one it is now75 if not found_matching_speaker:76 #check if it mathces a certain angle threshold to one of our speakers77 closest_dist = 1078 for speaker in self.speakers:79 dist = speaker.get_dist_to(thispos)80 if dist < closest_dist:81 closest_dist = dist82 closest_id = speaker.id83 84 if dist < 0.54: # 0.54 is roughly 10 degrees85 speaker.current_odas_id = recording_id_odas[i]86 recording_id_us[i] = speaker.id87 speaker.pos = thispos88 found_matching_speaker = True89 print("is matching %d" % (speaker.id))90 break91 92 if not found_matching_speaker:93 #this speaker does not exist, create a new one94 self.num_speakers += 195 print("Found new speaker: %d, pos: %f, %f, %f" % (self.num_speakers, thispos[0], thispos[1], thispos[2]))96 if closest_dist > 1.3: #1.3 is roughtly 30 dgree97 self.speakers.append(Speaker(self.num_speakers, thispos, recording_id_odas[i]))98 recording_id_us[i] = self.num_speakers99 else:100 self.speakers.append(Speaker(self.num_speakers, thispos, recording_id_odas[i], closest_dist, closest_id))101 recording_id_us[i] = self.num_speakers102 103 if self.visualization:104 #at this point we have assigned our speaker id, so save this information for the visualization thread105 current_speakers.append([recording_id_us[i], latest_id['src'][i]['id'], latest_id['src'][i]['x'], latest_id['src'][i]['y'], latest_id['src'][i]['z'], latest_id['src'][i]['activity'] ])106 107 108 #put data in the que for the visualizer109 if self.visualization:110 try:111 112 self.main_to_vis_queue.put({'current_speakers': current_speakers, 'known_speakers': self.speakers, 'num_known_speakers': self.num_speakers}, block=False)113 except Full:114 #print("couldn't put data into visualization queue, its full")115 pass116 117 118 119 120 #record audio of currently active speakers121 for i in range(len(recording_id_us)):122 if recording_id_us[i] > 0:123 audio_recording_buffer[i] = np.append(audio_recording_buffer[i], latest_audio[i])124 125 #if a speaker stopped speaking send the chunk off126 if recording_id_us[i] == 0 and last_recording_id_us[i] > 0:127 128 #TODO send this somewhere129 #clear this recording buffer130 audio_recording_buffer[i] = np.empty(0, dtype=np.int16)131 132 #check if one channel has been active for too long, then cut it and send it133 if audio_recording_buffer[i].shape[0] > 16000 * 60: #cut after 60 seconds134 135 #TODO send this somewhere136 #clear this recording buffer137 audio_recording_buffer[i] = np.empty(0, dtype=np.int16)138 139 last_recording_id_us[i] = recording_id_us[i]140 141 142 143 144 145 146 147 148 149 print("done.")150 self.id_recv.stop()151 self.audio_recv.stop()152 if self.visualization:153 self.visualizer.stop()154 155 156 ...
outlook.py
Source:outlook.py
1import email2import imaplib3import smtplib4import datetime5import email.mime.multipart6import config7import base648class Outlook():9 def __init__(self):10 pass11 # self.imap = imaplib.IMAP4_SSL('imap-mail.outlook.com')12 # self.smtp = smtplib.SMTP('smtp-mail.outlook.com')13 def login(self, username, password):14 self.username = username15 self.password = password16 login_attempts = 017 while True:18 try:19 self.imap = imaplib.IMAP4_SSL(config.imap_server,config.imap_port)20 r, d = self.imap.login(username, password)21 assert r == 'OK', 'login failed: %s' % str (r)22 print(" > Signed in as %s" % self.username, d)23 return24 except Exception as err:25 print(" > Sign in error: %s" % str(err))26 login_attempts = login_attempts + 127 if login_attempts < 3:28 continue29 assert False, 'login failed'30 def sendEmailMIME(self, recipient, subject, message):31 msg = email.mime.multipart.MIMEMultipart()32 msg['to'] = recipient33 msg['from'] = self.username34 msg['subject'] = subject35 msg.add_header('reply-to', self.username)36 # headers = "\r\n".join(["from: " + "sms@kitaklik.com","subject: " + subject,"to: " + recipient,"mime-version: 1.0","content-type: text/html"])37 # content = headers + "\r\n\r\n" + message38 try:39 self.smtp = smtplib.SMTP(config.smtp_server, config.smtp_port)40 self.smtp.ehlo()41 self.smtp.starttls()42 self.smtp.login(self.username, self.password)43 self.smtp.sendmail(msg['from'], [msg['to']], msg.as_string())44 print(" email replied")45 except smtplib.SMTPException:46 print("Error: unable to send email")47 def sendEmail(self, recipient, subject, message):48 headers = "\r\n".join([49 "from: " + self.username,50 "subject: " + subject,51 "to: " + recipient,52 "mime-version: 1.0",53 "content-type: text/html"54 ])55 content = headers + "\r\n\r\n" + message56 attempts = 057 while True:58 try:59 self.smtp = smtplib.SMTP(config.smtp_server, config.smtp_port)60 self.smtp.ehlo()61 self.smtp.starttls()62 self.smtp.login(self.username, self.password)63 self.smtp.sendmail(self.username, recipient, content)64 print(" email sent.")65 return66 except Exception as err:67 print(" Sending email failed: %s" % str(err))68 attempts = attempts + 169 if attempts < 3:70 continue71 raise Exception("Send failed. Check the recipient email address")72 def list(self):73 # self.login()74 return self.imap.list()75 def select(self, str):76 return self.imap.select(str)77 def inbox(self):78 return self.imap.select("Inbox")79 def junk(self):80 return self.imap.select("Junk")81 def logout(self):82 return self.imap.logout()83 def since_date(self, days):84 mydate = datetime.datetime.now() - datetime.timedelta(days=days)85 return mydate.strftime("%d-%b-%Y")86 def allIdsSince(self, days):87 r, d = self.imap.search(None, '(SINCE "'+self.since_date(days)+'")', 'ALL')88 list = d[0].split(' ')89 return list90 def allIdsToday(self):91 return self.allIdsSince(1)92 def readIdsSince(self, days):93 r, d = self.imap.search(None, '(SINCE "'+self.date_since(days)+'")', 'SEEN')94 list = d[0].split(' ')95 return list96 def readIdsToday(self):97 return self.readIdsSince(1)98 def unreadIdsSince(self, days):99 r, d = self.imap.search(None, '(SINCE "'+self.since_date(days)+'")', 'UNSEEN')100 list = d[0].split(' ')101 return list102 def unreadIdsToday(self):103 return self.unreadIdsSince(1)104 def allIds(self):105 r, d = self.imap.search(None, "ALL")106 list = d[0].split(' ')107 return list108 def readIds(self):109 r, d = self.imap.search(None, "SEEN")110 list = d[0].split(' ')111 return list112 def unreadIds(self):113 r, d = self.imap.search(None, "UNSEEN")114 list = d[0].split(' ')115 return list116 def hasUnread(self):117 list = self.unreadIds()118 return list != ['']119 def getIdswithWord(self, ids, word):120 stack = []121 for id in ids:122 self.getEmail(id)123 if word in self.mailbody().lower():124 stack.append(id)125 return stack126 def getEmail(self, id):127 r, d = self.imap.fetch(id, "(RFC822)")128 self.raw_email = d[0][1]129 self.email_message = email.message_from_string(self.raw_email)130 return self.email_message131 def unread(self):132 list = self.unreadIds()133 latest_id = list[-1]134 return self.getEmail(latest_id)135 def read(self):136 list = self.readIds()137 latest_id = list[-1]138 return self.getEmail(latest_id)139 def readToday(self):140 list = self.readIdsToday()141 latest_id = list[-1]142 return self.getEmail(latest_id)143 def unreadToday(self):144 list = self.unreadIdsToday()145 latest_id = list[-1]146 return self.getEmail(latest_id)147 def readOnly(self, folder):148 return self.imap.select(folder, readonly=True)149 def writeEnable(self, folder):150 return self.imap.select(folder, readonly=False)151 def rawRead(self):152 list = self.readIds()153 latest_id = list[-1]154 r, d = self.imap.fetch(latest_id, "(RFC822)")155 self.raw_email = d[0][1]156 return self.raw_email157 def mailbody(self):158 if self.email_message.is_multipart():159 for payload in self.email_message.get_payload():160 # if payload.is_multipart(): ...161 body = (162 payload.get_payload()163 .split(self.email_message['from'])[0]164 .split('\r\n\r\n2015')[0]165 )166 return body167 else:168 body = (169 self.email_message.get_payload()170 .split(self.email_message['from'])[0]171 .split('\r\n\r\n2015')[0]172 )173 return body174 def mailsubject(self):175 return self.email_message['Subject']176 def mailfrom(self):177 return self.email_message['from']178 def mailto(self):179 return self.email_message['to']180 def maildate(self):181 return self.email_message['date']182 def mailreturnpath(self):183 return self.email_message['Return-Path']184 def mailreplyto(self):185 return self.email_message['Reply-To']186 def mailall(self):187 return self.email_message188 def mailbodydecoded(self):...
merger.py
Source:merger.py
1import threading2from Queue import Queue3from Queue import Empty4from audio_receiver import Audio_Receiver5from id_receiver import Id_Receiver6# Merges to two continuous asynchronous streams of id and audio together, to a chunked stream of tagged audio data7class Merger(threading.Thread):8 def __init__(self, outq):9 threading.Thread.__init__(self)10 self.please_stop = threading.Event()11 self.id_to_merger_queue = Queue(maxsize=50)12 self.audio_to_merger_queue = Queue(maxsize=1000)13 self.id_recv = Id_Receiver(self.id_to_merger_queue)14 self.audio_recv = Audio_Receiver(self.audio_to_merger_queue)15 self.outq = outq16 def run(self):17 self.id_recv.start()18 self.audio_recv.start()19 while self.id_recv.is_alive() and self.audio_recv.is_alive() and not self.please_stop.is_set():20 # wait for new audio data21 try:22 latest_audio = self.audio_to_merger_queue.get(block=True, timeout=1)23 except Empty:24 continue # restart loop, but check again if we maybe got a stop signal25 # get the latest id update (empties queue and save the last one)26 while 1:27 try:28 latest_id = self.id_to_merger_queue.get(block=False)29 except Empty:30 break31 # get id data and put it into a list of 4 lists32 # each of the 4 will contain odas_id, x,y,z,activity33 id_info = []34 for i in range(len(latest_id['src'])):35 id_info.append([latest_id['src'][i]['id'], latest_id['src'][i]['x'], latest_id['src'][i]['y'],36 latest_id['src'][i]['z'], latest_id['src'][i]['activity']])37 # merge in a dict and put into a queue38 nextout = {'audio_data': latest_audio, 'id_info': id_info}39 if self.outq.qsize() > self.outq.maxsize - 20:40 print("merger out queue has grown to large, discarding some, this should rarely happen!!!!!!")41 for i in range(20): # q is almost full, discard some data42 try:43 self.outq.get(block=False)44 except Empty:45 pass46 self.outq.put(nextout) # this is blocking and thread safe47 if self.please_stop.is_set():48 print("Merger is stopping as externally requested.")49 else:50 print("Merger is stopping as one of the receivers has stopped.")51 self.id_recv.stop()52 self.audio_recv.stop()53 def stop(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!