Best Python code snippet using yandex-tank
astm_bidirectional_xl_1000.py
Source:astm_bidirectional_xl_1000.py
1#!/usr/bin/python32import bidirectional_general as astmg3import astm_bidirectional_conf as conf4import fcntl, signal, logging5from astm_bidirectional_common import file_mgmt6class astms(astmg.astmg, file_mgmt):7 def __init__(self):8 self.main_status=09 #0=neutral10 #1=receiving11 #2=sending12 self.send_status=013 #1=enq sent14 #2=1st ack received15 #3=data sent16 #4=2nd ack received17 #0=eot sent18 self.set_inbox(conf.inbox_data,conf.inbox_arch)19 self.set_outbox(conf.outbox_data,conf.outbox_arch)20 super().__init__()21 22 self.alarm_time=conf.alarm_time23 signal.signal(signal.SIGALRM, self.signal_handler)24 ###################################25 #override this function in subclass26 ###################################27 #read from enq to eot in single file28 #it will have stx->lf segments, with segment number 1...7..0..729 def manage_read(self,data):30 #EOF is handled in base class31 #for receiving data32 if(data==b'\x05'):33 signal.alarm(0) 34 self.main_status=135 self.write_msg=b'\x06'36 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable37 self.error_set=self.read_set.union(self.write_set) #update error set38 #new file need not be class global (unlike existing file -> which needs to be deleted)39 new_file=self.get_inbox_filename()40 self.fd=open(new_file,'wb')41 fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB) #lock file42 #Now use this fd for stx-etb/etx frame writing43 self.fd.write(data)44 print_to_log('File Content written:',data)45 46 signal.alarm(self.alarm_time)47 elif(data[-1:] == b'\x0a'):48 signal.alarm(0)49 50 if(self.calculate_and_compare_checksum(data)==True):51 print_to_log('checksum matched:','Proceeding to write data')52 self.fd.write(data)53 print_to_log('File Content written:',data)54 55 self.write_msg=b'\x06'56 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable57 self.error_set=self.read_set.union(self.write_set) #update error set 58 else:59 print_to_log('checksum mismatched:','Proceeding to send NAK')60 self.write_msg=b'\x15'61 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable62 self.error_set=self.read_set.union(self.write_set) #update error set 63 64 signal.alarm(self.alarm_time)65 66 elif(data==b'\x04'):67 signal.alarm(0)68 self.fd.write(data)69 print_to_log('File Content written:',data)70 self.fd.close()71 self.main_status=072 self.send_status=073 elif(data==b'\x06'): #ACK when sending74 if(self.send_status==1):75 signal.alarm(0)76 self.send_status=277 print_to_log('send_status=={}'.format(self.send_status),'post-ENQ ACK')78 #writing79 80 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable81 self.error_set=self.read_set.union(self.write_set) #update error set82 83 self.get_first_outbox_file() #set current_outbox file84 fd=open(self.outbox_data+self.current_outbox_file,'rb')85 86 #data must not be >102487 #it will be ETX data , not ETB data88 #Frame number will always be one and only one89 byte_data=fd.read(2024) 90 91 print_to_log('File Content',byte_data)92 chksum=self.get_checksum(byte_data)93 print_to_log('CHKSUM',chksum)94 self.write_msg=byte_data #set message95 96 #self.send_status=3 97 #data sent -> change status only when really data of stx-lf or anyother-inappropriate frame really sent98 99 #print_to_log('send_status=={}'.format(self.send_status),'changed send_status to 3 (data sent to write buffer)')100 #writing end101 signal.alarm(self.alarm_time) #wait for receipt of second ack102 103 elif(self.send_status==3):104 signal.alarm(0)105 self.send_status=4106 print_to_log('send_status=={}'.format(self.send_status),'post-LF ACK')107 #write108 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable109 self.error_set=self.read_set.union(self.write_set) #update error set110 self.write_msg=b'\x04' #set message EOT111 self.archive_outbox_file() 112 #self.send_status=0 #change only where actually sent113 #self.main_status=0 #change only where actually sent114 #print_to_log('send_status=={}'.format(self.send_status),'sent EOT')115 #print_to_log('main_status=={}'.format(self.main_status),'connection is now, neutral')116 #write end117 #alarm not required, no expectation signal.alarm(self.alarm_time) 118 signal.alarm(self.alarm_time) #when EOT is really sent, change status, or if nothing is sent change status using alarm119 elif(data==b'\x15'): #NAK120 signal.alarm(0)121 self.send_status=4122 print_to_log('send_status=={}'.format(self.send_status),'post-ENQ/LF NAK. Some error')123 #write - same as above124 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable125 self.error_set=self.read_set.union(self.write_set) #update error set126 self.write_msg=b'\x04' #set message EOT127 self.archive_outbox_file()128 #self.send_status=0 #only when actually sent129 #self.main_status=0130 print_to_log('send_status=={}'.format(self.send_status),'initiate_write() sent EOT')131 print_to_log('main_status=={}'.format(self.main_status),'initiate_write() now, neutral')132 #write end 133 #alarm not required, no expectation signal.alarm(self.alarm_time) 134 signal.alarm(self.alarm_time) #when EOT is really sent then change status , or if nothing is sent change status135 136 ###################################137 #override this function in subclass138 ###################################139 #This handles only STX-ETX data. NO STX-ETB management is done140 def initiate_write(self):141 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'Entering initiate_write()') 142 if(self.main_status==0):143 print_to_log('main_status=={}'.format(self.main_status),'initiate_write() will find some pending work') 144 if(self.get_first_outbox_file()==True): #There is something to work 145 signal.alarm(0) 146 self.main_status=2 #announce that we are busy sending data147 print_to_log('main_status=={}'.format(self.main_status),'initiate_write() changed main_status to 2 to send data')148 self.write_set.add(self.conn[0]) #Add in write set, for next select() to make it writable149 self.error_set=self.read_set.union(self.write_set) #update error set150 self.write_msg=b'\x05' #set message ENQ151 #self.send_status=1 #status to ENQ sent only when written152 print_to_log('send_status=={}'.format(self.send_status),'initiate_write() sent ENQ to write buffer')153 signal.alarm(self.alarm_time) #wait for receipt of 1st ACK154 else:155 print_to_log('main_status=={}'.format(self.main_status),'no data in outbox. sleeping for a while')156 return157 else:158 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'busy somewhre.. initiate_write() will not initiate anything') 159 ###################################160 #override this function in subclass161 ###################################162 def manage_write(self): 163 #common code164 #Send message in response to write_set->select->writable initiated by manage_read() and initiate_write()165 print_to_log('Following will be sent',self.write_msg)166 try:167 self.conn[0].send(self.write_msg)168 #self.write_msg=''169 except Exception as my_ex :170 print_to_log("Disconnection from client?",my_ex) 171 self.write_set.remove(self.conn[0]) #now no message pending, so remove it from write set172 self.error_set=self.read_set.union(self.write_set) #update error set173 174 #specific code for ASTM status update175 #if sending: ENQ, ...LF, EOT is sent176 #ff receiving: ACK, NAK sent (ACK seding donot need to change status, it activates only alarm177 if(self.write_msg==b'\x04'): #if EOT sent178 self.main_status=0179 self.send_status=0180 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because EOT is sent') 181 signal.alarm(0)182 print_to_log('Neutral State','.. so stopping alarm') 183 elif(self.write_msg[-1:]==b'\x0a'): #if main message sent184 self.send_status=3185 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because message is sent(LF)') 186 elif(self.write_msg==b'\x05'): #if enq sent187 self.send_status=1188 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because ENQ is sent') 189 elif(self.write_msg==b'\x06'): #if ack sent190 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. no change in status ACK is sent') 191 elif(self.write_msg==b'\x15'): #if NAK sent = EOT sent192 self.main_status=0193 self.send_status=0194 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. because NAK is sent. going neutral') 195 signal.alarm(0)196 print_to_log('Neutral State','.. so stopping alarm') 197 else: #if data stream is incomplate/inappropriate containing EOT etc198 self.send_status=3199 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),'.. incomplate message (without LF) sent. EOT will be sent in next round') 200 #######Specific funtions for ASTM 201 def get_checksum(self,data):202 checksum=0203 start_chk_counting=False204 for x in data:205 if(x==2):206 start_chk_counting=True207 #Exclude from chksum calculation208 continue209 if(start_chk_counting==True):210 checksum=(checksum+x)%256211 if(x==3):212 start_chk_counting=False213 #Include in chksum calculation214 if(x==23):215 start_chk_counting=False216 #Include in chksum calculation217 218 two_digit_checksum_string='{:X}'.format(checksum).zfill(2)219 return two_digit_checksum_string.encode()220 def compare_checksum(self, received_checksum, calculated_checksum):221 if(received_checksum==calculated_checksum):222 return True223 else:224 return False225 def calculate_and_compare_checksum(self, data):226 calculated_checksum=self.get_checksum(data)227 received_checksum=data[-4:-2]228 print_to_log(229 'Calculated checsum={}'.format(calculated_checksum),230 'Received checsum={}'.format(received_checksum)231 )232 return self.compare_checksum(received_checksum,calculated_checksum)233 def signal_handler(self,signal, frame):234 print_to_log('Alarm Stopped','Signal:{} Frame:{}'.format(signal,frame))235 print_to_log('change statuses ',' and close file')236 print_to_log('main_status={} send_status={}'.format(self.main_status,self.send_status),' -->previous ') 237 self.send_status=0 #data sent238 self.main_status=0 239 print_to_log('current main_status={} send_status={}'.format(self.main_status,self.send_status),' -->current ') 240 241 try:242 if self.fd!=None: 243 self.fd.close()244 print_to_log('self.signal_handler()','file closed')245 except Exception as my_ex:246 print_to_log('self.signal_handler()','error in closing file:{}'.format(my_ex)) 247 248 print_to_log('Alarm..response NOT received in stipulated time','data receving/sending may be incomplate')249def print_to_log(object1,object2):250 logging.debug('{} {}'.format(object1,object2))251 252#Main Code###############################253#use this to device your own script254if __name__=='__main__':255 logging.basicConfig(filename=conf.astm_log_filename,level=logging.DEBUG,format='%(asctime)s : %(message)s') 256 257 #print('__name__ is ',__name__,',so running code')258 while True:259 m=astms()260 m.astmg_loop()261 #break; #useful during debugging ...
mboxer.py
Source:mboxer.py
...12 self.name=stat_name13 def set(self,new_num,new_name):14 self.num=new_num15 self.name=new_name16 def send_status(self,f):17 f.write(str(self.num) +' '+ self.name +'\n')18 f.flush()19class Header:20 def __init__(self,list_of_elem):21 self.id=list_of_elem[0]22 self.value=list_of_elem[1].strip('\n')23 def send_header(self,f):24 f.write(self.id +' '+ self.value +'\n')25 f.flush()26def set_header(raw_string): #sluzi na pripravu suroveho textu pre konstruktor triedy Header27 elem=raw_string.split(':')28 ret_list=[]29 if len(elem)==2:30 if (elem[0].isascii()):31 if (' ' not in elem[0] and '/' not in elem[0] and '/' not in elem[1]):32 ret_list=elem33 return ret_list 34 35s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)36s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)37s.bind(('',9999))38signal.signal(signal.SIGCHLD,signal.SIG_IGN)39s.listen(5)40commands=['READ\n','WRITE\n','LS\n']41while True:42 connected_socket,address=s.accept()43 pid_chld=os.fork()44 current_status=Status(200,'Bad request')45 if pid_chld == 0:46 while True:47 s.close()48 header1=[]49 header2=[]50 data=''51 current_status.set(200,'Bad request')52 f=connected_socket.makefile(mode='rw')53 data=f.readline()54 if data in commands:55 if (commands.index(data)==0): #READ56 while True:57 data=f.readline()58 header1=set_header(data)59 if (len(header1)==2 and header1[0] == 'Mailbox'): #ak su tie dve casti hlavicky v poriadku,60 mailbox_head=Header(header1) #robim z nich prvok triedy Header61 data=f.readline()62 header2=set_header(data)63 if (header2[0]=='Message'):64 message_head=Header(header2)65 if not path.exists(mailbox_head.value):66 current_status.set(201,'No such message')67 current_status.send_status(f)68 f.write('\n')69 f.flush()70 break71 with open(mailbox_head.value+'/'+message_head.value) as my_file:72 content=my_file.read()73 current_status.set(100,'OK')74 current_status.send_status(f) 75 f.write('Content-length:'+str(len(content))+'\n')76 f.flush()77 f.write('\n'+content)78 f.flush()79 break80 else:81 current_status.send_status(f)82 break 83 else:84 current_status.send_status(f)85 break86 elif (commands.index(data)==1): #WRITE87 while True:88 data=f.readline()89 header1=set_header(data)90 if (len(header1)==2 and header1[0] == 'Mailbox'):91 mailbox_head=Header(header1)92 data=f.readline()93 header2=set_header(data)94 if (header2[0]=='Content-length' and int(header2[1])>-1):95 cont_len=Header(header2)96 if not path.exists(mailbox_head.value):97 current_status.set(203,'No such mailbox')98 current_status.send_status(f)99 f.write('\n')100 f.flush()101 break102 else:103 m=hashlib.md5()104 current_status.set(100,'OK')105 message=f.readline(int(cont_len.value))106 hex_path=m.hexdigest()107 f.flush()108 with open(mailbox_head.value+'/'+hex_path,"w") as my_file:109 my_file.write(message)110 current_status.send_status(f)111 f.write('\n')112 f.flush()113 break114 else:115 current_status.send_status(f)116 f.write('\n')117 f.flush()118 break119 else:120 current_status.send_status(f)121 f.write('\n')122 f.flush()123 break124 elif (commands.index(data)==2): #LS125 while True:126 data=f.readline()127 header1=set_header(data)128 if (len(header1)==2 and header1[0] == 'Mailbox'):129 mailbox_head=Header(header1)130 if not path.exists(mailbox_head.value):131 current_status.set(201,'No such mailbox')132 break133 files_list=[]134 files_list=os.listdir(mailbox_head.value)135 current_status.set(100,'OK')136 current_status.send_status(f) 137 f.write('Number-of-messages:'+str(len(files_list))+'\n\n')138 f.flush()139 for item in files_list:140 f.write(item+'\n')141 f.flush()142 break143 else:144 current_status.send_status(f)145 f.write('\n')146 f.flush()147 break148 else:149 current_status.set(204,'Unknown method')150 current_status.send_status(f)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!