Best Python code snippet using localstack_python
Main_ZJ.py
Source:Main_ZJ.py
1# -*- coding: utf-8 -*-2from PyQt4 import QtCore3import os,time,datetime,codecs4import telnetlib #telnetè¿æ¥æ¨¡å5import paramiko #sshè¿æ¥æ¨¡å6import ConfigParser #é
ç½®æ件模å7import sys,socket8import thread #å¤çåæ§å¶çº¿ç¨9import gc #åå¾åæ¶10reload(sys) #éæ°å è½½sys模å11sys.setdefaultencoding('utf8') #设置utf8为é»è®¤ç¼ç 12#Telnetç»å½13class Telnet():14 def __init__(self,host):15 self.telnet = telnetlib.Telnet(host, port = 10020, timeout=10) #è¿æ¥telnetæå¡å¨16 self.telnet.set_debuglevel(2)17 #读åç¨æ·ååå¯ç 18 def Read(self,Prompt,Timeout):19 buff = ""20 try:21 buff += self.telnet.read_until(Prompt,Timeout) #读åæå®çç¨æ·åæå¯ç ,Timeoutè¶
æ¶22 except:23 self.Send("\n")24 buff += self.telnet.read_until(Prompt,Timeout)25 return buff26 #åéå½ä»¤27 def Send(self,Command):28 self.telnet.write(str(Command)+'\n') #åè¿ç«¯åéå½ä»¤29 #å
³éè¿æ¥30 def Close(self):31 self.telnet.close() #ç»æ¢telnetè¿æ¥32#sshç»å½33class ssh():34 def __init__(self,host,username,passwd):35 self.s = paramiko.SSHClient() #建ç«ä¸ä¸ªè¿æ¥å¯¹è±¡36 self.s.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #å
许å°ä¿¡ä»»ç主æºèªå¨å å
¥å°host_allow å表ï¼æ¤æ¹æ³å¿
é¡»æ¾å¨connectæ¹æ³çåé¢37 self.s.connect(hostname=host,port=22, username=username, password=passwd,timeout = 30) #è¿æ¥æå¡å¨38 self.ssh = self.s.invoke_shell() #建ç«äº¤äºå¼shellè¿æ¥39 time.sleep(2)40 #åéæ°æ®41 def Send(self,Command):42 self.ssh.send(str(Command) + '\r')43 #æ¥æ¶æ°æ®44 def Recv(self,Buff_Size,Time):45 buff = ""46 try:47 buff += self.ssh.recv(Buff_Size,Time) #è·ååæ¾48 except:49 self.Send("\n")50 buff += self.ssh.recv(Buff_Size,Time)51 return buff52 #å
³éè¿æ¥53 def Close(self):54 self.s.close()55#è·åé
ç½®æ件信æ¯56class Config_ini():57 def __init__(self,filename):58 self.config = ConfigParser.ConfigParser() #å建é
ç½®æ件对象59 self.config.readfp(open(filename)) #æå¼å¹¶è¯»åé
ç½®æ件60 def get_info(self,session,key):61 return self.config.get(session,key) #读åé
ç½®æ件ä¸æå®æ®µçé®å¼62 def session(self):63 return self.config.sections() #è·åææç段64 def option(self,session):65 return self.config.options(session) #å¾å°æå®æ®µçææä¿¡æ¯66 def set(self,session,option,value):67 return self.config.set(session,option,value) #ä¿®æ¹é
ç½®æ件çå¼68#logä¿¡æ¯ä¿å69class Consumer(QtCore.QThread):70 def __init__(self,queue,parent = None):71 super(Consumer,self).__init__(parent)72 self.data = queue73 self.working = True74 def write_log(self,PD,SN,Time):75 if not os.path.exists(os.getcwd() + "\\log"): #exists()å½æ°å¤æè·¯å¾,getcwd()å½æ°è¿åå½åè·¯å¾76 os.makedirs(os.getcwd() + "\\log") #éå½å建ç®å½77 Path = os.getcwd() + "\\log\\" #æ件路å¾78 self.data_file = Path + PD + "_whole_" + SN + "_" + Time + ".log" #å¨Pathè·¯å¾ä¸å建logæ件79 while self.working: #循ç¯80 s = self.data.get() #è·åæµè¯æ°æ®81 F = codecs.open(self.data_file, "a+", encoding='gb18030') #以æå®çç¼ç 读å模å¼æå¼æ件82 F.write(s + "\r\n") #åå
¥æ件83 F.close() #å
³éæ件84#主æµè¯ç¨åº85class Main_Test(QtCore.QThread):86 def __init__(self,queue,parent = None):87 super(Main_Test,self).__init__(parent)88 self.data = queue #æ°æ®89 self.isWait = True #çå¾
90 self.working = True #å·¥ä½91 self.Input_IP_address=None92 self.error_count=0 #é误次æ°ä¸º093 self.Ship_Out_Address=None94 self.Red = "QPushButton{background-color:RED}" #红è²95 self.Yellow = "QPushButton{background-color:YELLOW}" #é»è²96 self.Green = "QPushButton{background-color:GREEN}" #绿è²97 self.Config_ini = Config_ini("ini/Paramiters.ini") #è·åé
ç½®æ件信æ¯98 self.GNS_SWV = self.Config_ini.get_info("GNS", "swv") #è·åGNS段swvä¿¡æ¯99 self.GNS_SPN = self.Config_ini.get_info("GNS", "spn")100 self.GNS_DPN = self.Config_ini.get_info("GNS", "dpn")101 self.GNS_BPN = self.Config_ini.get_info("GNS", "bpn")102 self.GNS_FPGAPN = self.Config_ini.get_info("GNS", "fpgapn")103 self.GNS_ISOPN = self.Config_ini.get_info("GNS", "isopn")104 self.GNS_MOD=self.Config_ini.get_info("GNS", "mod")105 self.GNS_FPGA=self.Config_ini.get_info("GNS","fpga_version")106 self.GNS_Docker = self.Config_ini.get_info("GNS", "docker")107 self.GNS_Nginx = self.Config_ini.get_info("GNS", "nginx")108 self.GNS_Nodejs = self.Config_ini.get_info("GNS", "nodejs")109 self.GNS_Mongodb = self.Config_ini.get_info("GNS", "mongodb")110 self.GNS_Underlying = self.Config_ini.get_info("GNS", "underlying")111 self.MAC = ""112 self.Boot = ""113 self.Kernel = ""114 self.APP = ""115 self.Config = ""116 #è·åIPå°å117 def Local_IP(self):118 self.Local_IP = socket.gethostbyname(socket.gethostname()) #è·åæ¬å°ä¸»æºåçIPå°å119 return str(self.Local_IP)120 #è¿æ¥è®¾å¤121 def Connection(self, host):122 username = "gns"123 passwd = "feitian"124 try:125 self.Connect = ssh(str(host), username, passwd) #sshè¿æ¥æå¡å¨126 self.Send_Command("\n")127 except Exception, e:128 self.Test_Fail(str(e))129 #åéæ¥æ¶æ°æ®130 def Send_Command(self, Command, Prompt='#', Timeout=10,wait_time=1):131 try:132 buff = ""133 log = ""134 self.Connect.Send(Command) #åéå½ä»¤135 starttime = datetime.datetime.now() #è·åå½åæ¶é´136 # while not buff.endswith(Prompt):137 while Prompt not in buff:138 buff = ""139 time.sleep(1)140 buff += self.Connect.Recv(99999,wait_time)141 log += buff142 self.data.put(buff)143 self.emit(QtCore.SIGNAL('output(QString)'), buff)144 endtime = datetime.datetime.now()145 if (endtime - starttime).seconds > Timeout:146 self.Test_Fail(u"è¶
æ¶, %s ä¸è½æ¾å°" % Prompt)147 break148 return log149 except Exception, E:150 self.Test_Fail(u"å½ä»¤é误ï¼%s ä¸è½æ¾å°" % Prompt)151 #éè¿å¼å¤´åç»å°¾å符串è·åä¸å¿å符串152 def GetMiddleStr(self, content, startStr, endStr):153 try:154 startIndex = content.index(startStr) #æ£æµcontentå符串ä¸æ¯å¦å
å«startstrå符串,è¿åå¼å§çç´¢å¼155 if startIndex >= 0:156 startIndex += len(startStr)157 endIndex = content.index(endStr) #æ£æµcontentå符串ä¸æ¯å¦å
å«endstrå符串,è¿åå¼å§çç´¢å¼158 return content[startIndex:endIndex].strip() #移é¤å符串æ¶å°¾çç©ºæ ¼ï¼å¹¶è¿åå符串æå®çå符159 except Exception, e:160 self.Test_Fail(u"å
容è¿åé误")161 #设置é¢è²162 def Set_Color(self, message):163 self.emit(QtCore.SIGNAL('color'), message)164 #设置å°å165 def Set_Status(self, message):166 self.emit(QtCore.SIGNAL('status'), message)167 #é误æ¶æ¯168 def error(self,message):169 if message !="":170 self.emit(QtCore.SIGNAL('error'),message)171 self.data.put(message)172 #æµè¯éè¿æ示173 def Test_Pass(self,message):174 if message !="":175 self.emit(QtCore.SIGNAL('pass'),message) #åéä¿¡å·176 self.data.put(message)177 #æµè¯å¼å§178 def Test_Running(self,message):179 l = "########################################" + message + "########################################"180 self.emit(QtCore.SIGNAL('dis_message'), l) #åéä¿¡å·181 self.data.put(l) #å
¥é182 #æµè¯å¤±è´¥183 def Test_Fail(self,message):184 self.working = False185 self.error_count = 1 #é误计æ°186 self.Set_Color(self.Red)187 self.data.put(message) #æµè¯å¤±è´¥ä¿¡æ¯å
¥é188 self.error(message) #åéé误信æ¯189 self.emit(QtCore.SIGNAL('error'), "<font color=red><font size = 10>%s</font>" % u"Test FAIL\n")190 self.emit(QtCore.SIGNAL('stop'), self.working)191 thread.exit_thread() # ç»æ¢çº¿ç¨192 #è¾å
¥IPå°å193 def Input_IP(self,message):194 self.emit(QtCore.SIGNAL('input'),message)195 while self.isWait:196 time.sleep(1)197 self.isWait = True198 return self.Input_IP_address199 #è¾å
¥å°å200 def Ship_Out_Address_setting(self,message):201 self.emit(QtCore.SIGNAL('ship'),message)202 while self.isWait:203 time.sleep(1)204 self.isWait = True205 return self.Ship_Out_Address206 #æ示æ¶æ¯207 def Prompt(self,message):208 self.emit(QtCore.SIGNAL('Prompt'),message)209 while self.isWait:210 time.sleep(1)211 self.isWait=True212 #æµè¯å®æ213 def Test_Finished(self,message):214 self.working = False215 if self.error_count == 0:216 self.Test_Pass(message)217 self.emit(QtCore.SIGNAL('stop'),self.working)218 thread.exit_thread() # ç»æ¢çº¿ç¨219 #ç¨åºè¿è¡220 def Script_Start(self,PD,SN,Time,Host,Server_IP = None):221 self.working = True222 self.PD = PD223 self.SN = SN224 self.Host = Host225 self.Time = Time226 self.error_count = 0 #é误计æ°227 Ping=os.system("ping -n 5 %s"%Host)228 if Ping == 0:229 self.Test_Running(u"##### Login #####") #æµè¯å¼å§ä¿¡å·230 self.Connection(Host) #è¿æ¥æµè¯è®¾å¤231 os.system('netsh firewall set opmode disable') #åå°å
³éwindowsç³»ç»é²ç«å¢232 os.system("start /b iperf.exe -s -w1m&") #windowsç³»ç»åå°è¿è¡iperfï¼å¹¶æå®é项233 time.sleep(5)234 self.Install_Tools() #GNSå·¥å
·å®è£
235 while self.working == True:236 self.VersionCheck() #çæ¬æ£æµ237 self.MAC_check() #MACå°åæ£æµ238 self.ShowTemperature() #温度æ£æµ239 self.MemeryCheck() #å
åæ£æµ240 if Server_IP != "":241 self.clock_test(Server_IP) #æ¶éæ£æµ242 self.Discrete() #离æ£éæ£æµ243 if Server_IP != "":244 self.EthSpeedCheck(Server_IP) #ç½å£éçæ£æµ245 self.USBCheck() #USBæ£æµ246 self.SSDCheck() #硬çæ£æµ247 self.Clean_Caches() #æ¸
çç¼å248 os.system('netsh firewall set opmode mode=enable') #å¼å¯Windowsé²ç«å¢249 os.system("taskkill /f /t /im iperf.exe") #å
³éiperfè¿ç¨250 os.system("taskkill /f /t /im cmd.exe") #å
³édosçªå£251 self.Test_Finished(u"<font size = 10>LRU Test Completed PASS</font>")252 gc.collect() #åå¾åæ¶253 else:254 self.Test_Fail(u"IP ping failed")255 self.Prompt(u"IP ping failed")256 #GNSå·¥å
·å®è£
257 def Install_Tools(self):258 self.Send_Command("cd /root/software/gns_test/")259 self.Send_Command("chmod +x *")260 self.Send_Command("./install_sysstat.sh","#",60)261 time.sleep(3)262 self.Send_Command("\cp -f kcpu /sbin")263 self.Send_Command("\cp -f iozone /sbin")264 self.Send_Command("\cp -f nmon /sbin")265 self.Send_Command("\cp -f iperf /sbin")266 self.Send_Command("\cp -f cpulimit /sbin")267 self.Send_Command("\cp -f run_kcpu.sh /sbin")268 self.Send_Command("cd 429")269 self.Send_Command("\cp -f * /sbin")270 self.Send_Command('cd /')271 #æ¸
çç¼å272 def Clean_Caches(self):273 self.Send_Command("echo 3 >> /proc/sys/vm/drop_caches&")274 time.sleep(3)275 self.Send_Command("free -m")276 self.Send_Command('cd /')277 #MACå°åæ£æµ278 def MAC_check(self):279 self.Test_Running(u"### MAC Check ###")280 MAC = self.Send_Command("ifconfig | grep eth0")281 if "5C:E0:CA" in MAC:282 self.Test_Pass(u"MAC PASS")283 else:284 self.Test_Fail(u"MAC FAIL")285 #æ¶éæ£æµ286 def clock_test(self, Server_IP):287 self.Test_Running(U"##### NTP Check #####")288 os.system(289 'reg add HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\TimeProviders\NtpServer /v Enabled /t REG_DWORD /d 1 /f') # ä¿®æ¹æ³¨å表290 os.system(291 'reg add HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\W32Time\Config /v AnnounceFlags /t REG_DWORD /d 5 /f') # ä¿®æ¹æ³¨å表292 os.system("net stop w32time & net start w32time") # åå°åæ¢åå¯å¨windowsç³»ç»NTPæå¡293 self.Send_Command("service ntpd stop")294 self.Send_Command("ntpdate %s &" % Server_IP, "Done", 15)295 time.sleep(5)296 date = self.Send_Command("date") #è·å设å¤æ¶é´297 clock = str(datetime.datetime.now().year) #è·åæ¬å°æ¶é´298 if clock in date:299 self.Test_Pass(u'NTP PASS')300 else:301 self.Test_Fail(u"NTP FAIL")302 #ç½å£éç303 def EthSpeedCheck(self, Server_IP):304 self.Test_Running(u" Ethernet Front-end Ports Rate Check ")305 Ping_message = self.Send_Command("ping %s -c5" % Server_IP)306 if "Host Unreachable" in Ping_message:307 self.Test_Fail(u"Ping Server Fail")308 else:309 time.sleep(5)310 for count in range (1,6):311 time.sleep(3)312 Eth_Speed = self.Send_Command("iperf -c "+Server_IP+" -w1m -i1 -t30 | grep '0.0-3'&","Done",40)313 if "Broken pipe" not in Eth_Speed:314 result= self.GetMiddleStr(str(Eth_Speed),'Bytes',"Mbits")315 if float(result) >500: #以太ç½æ¥å£éç大äº500å
316 if count < 5 :317 info = u'Ethernet port '+ str(count) + u' rateï¼' + str(result) + u'Mbits/sec PASS,please connect to ETH' + str(count+1)318 else :319 info = u'Ethernet port 5 rateï¼%sMbits/sec PASS'% str(result)320 self.Test_Pass(info)321 self.Prompt(info)322 else:323 info = u'Ethernet port rate ' + str(result) +u'Mbits/sec,FAIL.'324 self.Test_Fail(info)325 else:326 self.Test_Fail(u'Iperf address access failed')327 #温度æ£æµ328 def ShowTemperature(self):329 self.Test_Running(u"##### Temperature Check #####")330 temp = self.Send_Command("sensors -u&", "Done", 10)331 Core0 = self.GetMiddleStr(str(temp), 'temp2_input:', "temp2_max:")332 if float(Core0) > 96:333 self.Test_Fail(u"CPU Core 0 Temperature:%sâ,More than 96â,FAIL" % Core0)334 else:335 self.Test_Pass(u"CPU Core 0 Temperature:%sâ,PASS" % Core0)336 Core1 = self.GetMiddleStr(str(temp), 'temp3_input:', "temp3_max:")337 if float(Core1) > 96:338 self.Test_Fail(u"CPU Core 1 Temperature:%sâ,More than 96â,FAIL" % Core1)339 else:340 self.Test_Pass(u"CPU Core 1 Temperature:%sâ,PASS" % Core1)341 #å
åæ£æµ342 def MemeryCheck(self):343 self.Test_Running(u"##### Memory Check #####")344 self.Send_Command("iostat -m")345 mem = self.Send_Command("free | grep Mem | awk '{print $2}'", "#", 30)346 mem = self.GetMiddleStr(str(mem), "}'", "[").strip()347 if float(mem) < 8126000:348 self.Test_Fail(u"Memory ï¼ 8G,FAIL")349 else:350 self.Test_Pass(u"Memory:%s,PASS" % mem)351 #çæ¬æ£æµ352 def VersionCheck(self):353 self.Test_Running(u"##### Version Check #####")354 vershow = self.Send_Command("vershow&", "Done", 60,5)355 underlying_version=self.GetMiddleStr(vershow,'underlying version:',"Config").strip()356 if underlying_version == self.GNS_Underlying:357 self.Test_Pass(u"Software Versionï¼%s,PASS" % underlying_version)358 else:359 self.Test_Fail(u"Software Versionï¼%s,FAIL" % underlying_version)360 fpga = self.Send_Command('fpga_version')361 FPGA=self.GetMiddleStr(fpga,'version:',"[").strip()362 if FPGA == self.GNS_FPGA:363 self.Test_Pass(u"FPGAï¼%s,PASS" % FPGA)364 else:365 self.Test_Fail(u"FPGAï¼%s,FAIL" % FPGA)366 swv = self.Send_Command('swv_read')367 swv = self.GetMiddleStr(swv, "version :", "[").strip()368 if swv == self.GNS_SWV:369 self.Test_Pass(u"SWVï¼%s,PASS" % swv)370 else:371 self.Test_Fail(u"SWVï¼%s,FAIL" % swv)372 mod = self.Send_Command('mod_read')373 mod = self.GetMiddleStr(mod, "Mod number:", "[").strip()374 if mod == self.GNS_MOD:375 self.Test_Pass(u"MODï¼%s,PASS" % mod)376 else:377 self.Test_Fail(u"MODï¼%s,FAIL" % mod)378 spn = self.Send_Command('spn_read')379 spn = self.GetMiddleStr(spn, "part number:", "[").strip()380 if spn == self.GNS_SPN:381 self.Test_Pass(u"SPNï¼%s,PASS" % spn)382 else:383 self.Test_Fail(u"SPNï¼%s,FAIL" % spn)384 dpn = self.Send_Command('dpn_read')385 dpn = self.GetMiddleStr(dpn, "PN_VERSION:", "[").strip()386 if dpn == self.GNS_DPN:387 self.Test_Pass(u"DPNï¼%s,PASS" % dpn)388 else:389 self.Test_Fail(u"DPNï¼%s,FAIL" % dpn)390 bpn = self.Send_Command('board_pn_read')391 bpn = self.GetMiddleStr(bpn, "Board_Product_Number:", "[").strip()392 if bpn == self.GNS_BPN:393 self.Test_Pass(u"BPNï¼%s,PASS" % bpn)394 else:395 self.Test_Fail(u"BPNï¼%s,FAIL" % bpn)396 fpgapn = self.Send_Command('fpgapn_read')397 fpgapn = self.GetMiddleStr(fpgapn, "fpga part number:", "[").strip()398 if fpgapn == self.GNS_FPGAPN:399 self.Test_Pass(u"FPGA PNï¼%s,PASS" % fpgapn)400 else:401 self.Test_Fail(u"FPGA PNï¼%s,FAIL" % fpgapn)402 isopn = self.Send_Command('isopn_read')403 isopn = self.GetMiddleStr(isopn, "iso part number:", "[").strip()404 if isopn == self.GNS_ISOPN:405 self.Test_Pass(u"ISO PNï¼%s,PASS" % isopn)406 else:407 self.Test_Fail(u"ISO PNï¼%s,FAIL" % isopn)408 if "3.10.5-3.el6.x86_64" in self.Send_Command('uname -a'):409 self.Test_Pass(u"System Versionï¼3.10.5-3.el6.x86_64,PASS")410 else:411 self.Test_Fail(u"System Versionï¼3.10.5-3.el6.x86_64,FAIL")412 dsn = self.Send_Command('dsn_read')413 bsn = self.Send_Command('board_sn_read')414 if "failed" in vershow:415 self.Test_Fail(u"Version Upgrade FAIL")416 #USB模åæ£æµ417 def USBCheck(self):418 self.Test_Running(u"##### USB Check #####")419 self.Prompt(u"Please insert a USB flash drive in FAT32 format") #æ¶æ¯æ示420 self.Send_Command('mkdir usb','#',10)421 usb_drive=self.Send_Command("fdisk -l | grep FAT32 | awk '{print $1}'")422 drive= self.GetMiddleStr(str(usb_drive),"}'","[").strip()423 if '/dev/sd' in drive:424 self.Test_Pass('U Disk PASS')425 self.Send_Command('mount %s usb' % drive)426 self.Send_Command('cd usb' )427 usb_wr = self.Send_Command('dd bs=1M count=100 if=%s of=test conv=fsync'%drive,'#',30)428 usb_wr_A= self.GetMiddleStr(str(usb_wr),'s,',"[")429 self.Send_Command('rm -rf test')430 self.Send_Command('cd ..' )431 self.Send_Command('umount usb')432 self.Send_Command('rm -rf usb')433 self.Test_Pass(u"USB W+R Rate: %s"%str(usb_wr_A) )434 else:435 self.Test_Fail(u'U Disk FAIL' )436 #硬çæ£æµ437 def SSDCheck(self):438 self.Test_Running(u"### SSD Check ###")439 if 'SATA' in self.Send_Command('lspci'):440 self.Test_Pass(u'SSD Module Recognition PASS')441 else:442 self.Test_Fail(u"SSD Module Recognition FAIL")443 num1 = self.Send_Command("fdisk -l | grep -c sda")444 num2 = self.Send_Command("df | grep -c sda")445 if "7" in num1 and "4" in num2:446 GNS_DISK = self.Send_Command('fdisk -l | grep sda')447 sda_size = self.GetMiddleStr(GNS_DISK, '/dev/sda:', "GB")448 if float(sda_size) > 950:449 self.Test_Pass(u"SSD-1 size=%sGB PASS" % sda_size)450 else:451 self.Test_Fail(u"SSD-1 sizeï¼950GB FAIL")452 GNS_DISK = self.Send_Command('fdisk -l | grep sdb')453 if '/dev/sdb' in GNS_DISK:454 sdb_size = self.GetMiddleStr(str(GNS_DISK), 'Disk /dev/sdb: ', "GB,")455 if float(sdb_size) > 950:456 self.Test_Pass(u"SSD-2 size=%sGB PASS" % sdb_size)457 else:458 self.Test_Fail(u"SSD-2 sizeï¼950GB FAIL")459 else:460 self.Test_Fail(u"SSD-2 Recognition FAIL ")461 HD_write = self.Send_Command('dd bs=16M count=1024 if=/dev/zero of=test conv=fdatasync', '#', 300,10)462 HD_write_A = self.GetMiddleStr(str(HD_write), 's,', "MB/s")463 HD_read = self.Send_Command('dd bs=16M if=test of=/dev/null', '#', 300,10)464 HD_read_A = self.GetMiddleStr(str(HD_read), 's,', "MB/s")465 self.Send_Command("rm -rf test", "#", 30)466 if float(HD_write_A) < 150:467 self.Test_Fail(u"SSD Write Rate < 150MB/s FAIL")468 elif float(HD_read_A) < 170:469 self.Test_Fail(u"SSD Read Rate < 170MB/s FAIL")470 else:471 self.Test_Pass(u"SSD Read Rateï¼%sMB/s PASS" % str(HD_read_A))472 self.Test_Pass(u"SSD Write Rateï¼%sMB/s PASS" % str(HD_write_A))473 else:474 self.Test_Fail(u"SSD Partition FAIL")475 #离æ£éæ£æµ476 def Discrete(self):477 self.Test_Running(u"#### Discrete Check ####")478 self.Send_Command("arinc set_control_off")479 for i in range(1,17):480 if i<15:481 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))482 else:483 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))484 low = self.Send_Command("arinc get_signalstatusmatrix", '#', 5)485 low=self.GetMiddleStr(low,'get_signalstatusmatrix','[')486 low = int(low[3] + low[7] + low[11] + low[15] + low[19] + low[23] + low[27] + low[31] + low[35] + low[39] + low[44] +low[49] + low[54])487 for i in range(1, 17):488 if i < 15:489 self.Send_Command("hi8435_cfg wrDiscOut %s high" % str(i))490 else:491 self.Send_Command("hi8435_cfg wrDiscOut %s high" % str(i))492 high = self.Send_Command("arinc get_signalstatusmatrix", '#', 5)493 high = self.GetMiddleStr(high,'get_signalstatusmatrix','[')494 high = int(high[3] + high[7] + high[11] + high[15] + high[19] + high[23] + high[27] + high[31] + high[35] + high[39] +high[44] + high[49] + high[54])495 log1=low+high496 for i in range(1, 17):497 if i < 15:498 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))499 else:500 self.Send_Command("hi8435_cfg wrDiscOut %s low" % str(i))501 low = self.Send_Command("arinc get_signalstatusmatrix", '#', 5)502 low = self.GetMiddleStr(low, 'get_signalstatusmatrix', '[')503 low = int(low[3] + low[7] + low[11] + low[15] + low[19] + low[23] + low[27] + low[31] + low[35] + low[39] + low[44] +low[49] + low[54])504 log2=high+low505 if log1==1111111111111 and log2==1111111111111:506 self.Test_Pass('Discrete Check PASS')507 else:508 self.Test_Fail('Discrete Check FAIL')509 #429æ£æµ510 self.Test_Running(u"##### ARINC429 Check #####")511 self.Send_Command("hi3593_c0_cfg setRx1BitRate high\r")512 self.Send_Command("hi3593_c0_cfg setRx2BitRate high\r")513 self.Send_Command("hi3593_c0_cfg setTxBitRate high\r")514 self.Send_Command("hi3593_c1_cfg setRx1BitRate high\r")515 self.Send_Command("hi3593_c1_cfg setRx2BitRate high\r")516 self.Send_Command("hi3593_c1_cfg setTxBitRate high\r")517 if int(self.GetMiddleStr(self.Send_Command("ps | grep -c netlink_u_self.APP"), "self.APP", "[")) < 1:518 self.Send_Command("netlink_u_app &\r", '#', 3)519 c = self.Send_Command("hi429_sendmsg_user_chip0 123 3\r")520 if "0x42910001" in c:521 self.Test_Pass(u"ARINC429 CHIP0 PASS")522 #if "0x42910002" not in c:523 #self.Test_Fail(u"429 CHIP0 RX2 æµè¯å¤±è´¥")524 else:525 self.Test_Fail(u"ARINC429 CHIP0 FAIL")526 d = self.Send_Command("hi429_sendmsg_user_chip1 456 3\r")527 if "0x42920001" in d:528 self.Test_Pass(u"ARINC429 CHIP1 PASS")529 #if "0x42920002" not in d:530 #self.Test_Fail(u"429 CHIP1 RX4 æµè¯å¤±è´¥")531 else:532 self.Test_Fail(u"ARINC429 CHIP1 FAIL")533 #å级534 def upgrade(self,PD,Host,Boot,Kernel,APP,Config,MAC,Server_IP):535 self.working = True536 self.starttime = datetime.datetime.now()537 self.PD = PD538 self.Host = Host539 self.Boot = Boot540 self.Kernel = Kernel541 self.APP = APP542 self.Config = Config543 self.MAC = MAC544 self.error_count = 0545 self.Connection(Host)546 time.sleep(5)547 PD_message = self.Send_Command("cat /sbin/PRODUCT_MESSAGE ","#",5 )548 SN = self.GetMiddleStr(str(PD_message),'SN_VERSION:',"PN_VERSION:").strip()549 PN = self.GetMiddleStr(str(PD_message),"PN_VERSION:",'CPLD').strip()550 if self.Boot == '' and self.Kernel == "" and self.APP == "" and self.Config == "":551 if "failed" not in self.Send_Command("vershow",'#',15):552 if len(self.MAC) == 17:553 self.Test_Running(u"program PD information")554 self.Send_Command("pm_flash " + str(self.PD).lower() +" "+ PN + " "+ self.MAC + " "+ SN,"complete",20 )555 self.Send_Command("configsave","Config save success",10)556 self.Send_Command("\r")557 else:558 self.Test_Fail(u'请å
å级')559 else:560 while self.working :561 if self.Boot != '':562 self.Test_Running(u"upgrade Boot")563 if "timeout" in self.Send_Command("tftp -gr "+ self.Boot + " "+ Server_IP,"#",10):564 self.Test_Fail(u"æ件ä¸ä¼ 失败")565 if 'failed ' in self.Send_Command("bootupdate "+ self.Boot,"#",30):566 self.Test_Fail(u'Bootæ件ä¸å¹é
')567 self.Send_Command("\r")568 if self.Kernel != "":569 self.Test_Running(u"upgrade Kernel")570 self.Send_Command("tftp -gr " + self.Kernel + " "+ Server_IP,"#",30 )571 self.Send_Command("kernelupdate " + self.Kernel,"system for testing",150)572 self.Send_Command("\r")573 if self.APP != "":574 self.Test_Running(u"upgrade APP")575 self.Send_Command("rm /mnt/mmcfs/fsself.APP*")576 self.Send_Command("ls -l /mnt/mmcfs","#",5)577 self.Send_Command("tftp -gr " + self.APP + " "+ Server_IP,"#",30)578 self.Send_Command("fsappsave %s &"%self.APP,"Done",600)579 self.Send_Command("\r")580 if self.Config != "":581 self.Test_Running(u"upgrade Configuration")582 self.Send_Command("rm /mnt/mmcfs/fscon*")583 self.Send_Command("ls -l /mnt/mmcfs","#",5)584 self.Send_Command("tftp -gr " + self.Config + " " +Server_IP,"#",10 )585 if self.PD == 'CWAP' or self.PD == 'TWCU':586 self.Send_Command("fsconsave 0 " + self.Config,"save succes",30)587 else:588 self.Send_Command("fsconsave 10 " + self.Config,"save succes",30)589 self.Send_Command("\r")590 self.Send_Command("reboot\r","reboot")591 self.Connect.Close()592 self.Test_Running(u"rebooting")593 time.sleep(100)594 if self.PD == 'RWS22':595 Host_IP = "10.66.10.1"596 self.Connection(str(Host_IP))597 if len(self.MAC) == 17:598 self.Test_Running(u"program PD information")599 self.Send_Command("pm_flash " + str(self.PD).lower() +" "+ PN + " "+ self.MAC + " "+ SN,"complete",20 )600 self.Send_Command("configsave","save success",10)601 self.Send_Command("\r")602 if self.Boot != '':603 self.Test_Running(u"upgrade sBoot2")604 self.Send_Command("tftp -gr "+ self.Boot + " "+ Server_IP,"#",10)605 self.Send_Command("bootupdate "+ self.Boot,"#",30)606 self.Send_Command("\r")607 if self.Kernel != "":608 self.Test_Running(u"upgrade Kernel 2")609 self.Send_Command("kernelconfirm")610 self.Send_Command("tftp -gr " + self.Kernel + " "+ Server_IP ,"#",30)611 self.Send_Command("\r")612 self.Send_Command("kernelupdate " + self.Kernel,"system for testing",150)613 self.Send_Command("\r")614 if self.APP != "":615 self.Test_Running(u"upgrade APP 2")616 self.Send_Command("tftp -gr " + self.APP + " "+ Server_IP,"#",30 )617 self.Send_Command("fsappsave %s &"%self.APP,"Done",600)618 self.Send_Command("\r")619 if self.Config != "":620 self.Test_Running(u"upgrade Configuration 2")621 self.Send_Command("tftp -gr " + self.Config + " " +Server_IP,"#",10)622 if self.PD == 'CWAP' or self.PD == 'TWCU':623 self.Send_Command("fsconsave 0 " + self.Config,"save succes",30)624 else:625 self.Send_Command("fsconsave 10 " + self.Config,"save succes",30)626 self.Send_Command("\r")627 self.Send_Command("reboot\r","reboot")628 self.Connect.Close()629 self.Test_Running(u"rebooting")630 time.sleep(100)631 self.Connection(str(Host_IP))632 if self.Kernel != "":633 self.Send_Command("kernelconfirm")634 self.Test_Running(u"checking version")635 if "failed" in self.Send_Command("vershow",'#',15):636 self.Test_Fail(u'çæ¬é误,å级失败')637 else:638 self.Test_Finished(u"å级æå")639 #å级config640 def Ship_Out(self,PD,Host,Config,Server_IP):641 self.PD = PD642 self.Connection(Host) #è¿æ¥ä¸»æº643 time.sleep(5)644 if Config != "":645 self.Test_Running(u"upgrade configuration")646 self.Send_Command("rm /mnt/mmcfs/fscon*")647 self.Send_Command("ls -l /mnt/mmcfs","#",5)648 self.Send_Command("tftp -gr " + Config + " " +Server_IP,"#",5 )649 self.Ship_Out_Address_setting(U"请è¾å
¥éè¦è®¾ç½®çäºãä¸æ®µå°å")650 self.Send_Command("fsconsave " + self.Ship_Out_Address + " " + Config,"save succes",30)651 self.Send_Command("\r")652 self.Send_Command("reboot\r","reboot")653 self.Connect.Close()654 self.Test_Running(u"rebooting")655 time.sleep(120)656 Ship_IP = "10."+self.Ship_Out_Address+".1"657 self.Connection(str(Ship_IP))658 self.Test_Running(u"upgrade configuration 2")659 self.Send_Command("tftp -gr " + Config + " " +Server_IP,"#",5)660 self.Send_Command("fsconsave " + self.Ship_Out_Address + " " + Config,"save succes",30)661 self.Send_Command("\r")662 self.Send_Command("reboot\r","reboot")663 self.Connect.Close()664 self.Test_Running(u"rebooting")665 time.sleep(100)666 self.Connection(str(Ship_IP))667 self.Test_Running(u"checking version")668 if "failed" in self.Send_Command("vershow",'#',15):669 self.Test_Fail(u'çæ¬é误,å级失败')670 else:671 self.Test_Finished(u"åºåé
ç½®å®æ")672 #ä¿®æ¹IPå°å673 def Modify_IP(self,PD,Host):674 self.PD = PD675 self.Connection(Host)676 time.sleep(5)677 self.Test_Running(u"Modify IP address:")678 self.Send_Command("\r")679 IP_Modify = self.Input_IP(U"请è¾å
¥ä½ æ³æ¹æç设å¤IPå°å:")680 PCB_MAC = self.Send_Command("cat sbin/PRODUCT_MESSAGE | grep MacAddr | awk '{print $1}'")681 FT600_MAC = str(PCB_MAC[-13:-8])682 self.Send_Command("sed -i 's/" + Host + "/" + IP_Modify + "/g' /etc/config/Ethernet")683 self.Send_Command("sed -i 's/255.255.255.0/255.0.0.0/g' /etc/config/Ethernet")684 self.Send_Command("cat /etc/config/Ethernet")685 self.Send_Command("configsave", "config save success", 30)686 self.Send_Command("reboot","reboot")687 if PD == 'RWLU-1U':688 self.Connect.Close()689 FT600_IP = str(Host[:-3] + str(int(Host[-3:]) + 100))690 FT600_Modify_IP = IP_Modify[:-3] + str(int(IP_Modify[-3:]) + 100)691 try:692 self.Connection(str(FT600_IP))693 except Exception, e:694 self.Test_Fail(e)695 self.FT600_Login()696 self.Send_Command("sed -i 's/11:12/" + str(FT600_MAC).upper() + "/g' /sbin/PRODUCT_MESSAGE ")697 self.Send_Command("rm /etc/config/wireless")698 self.Send_Command("sed -i '/br-lan/'d /etc/rc.d/rcS")699 self.Send_Command("sed -i '/net.sh start/a\ifconfig br-lan " + FT600_Modify_IP + "' /etc/rc.d/rcS ")700 self.Send_Command("sed -n '/br-lan/p' /etc/rc.d/rcS")701 self.Send_Command("reboot","reboot")702 time.sleep(2)703 self.Test_Finished(u"IPä¿®æ¹æå")704 #ç£çæ ¼å¼å705 def Format_Disk(self,PD, Host):706 self.PD = PD707 self.Connection(Host)708 time.sleep(5)709 self.Test_Running(u"Format DISK")710 if '5' not in self.Send_Command('fdisk -l | grep -c "/dev/sda"'):711 self.Send_Command("fdisk /dev/sda","Command (m for help):")712 self.Send_Command("n",":")713 self.Send_Command("p","Partition number (1-4):")714 self.Send_Command("1","):")715 self.Send_Command("","):")716 self.Send_Command("+900M","Command (m for help):")717 self.Send_Command("n",":")718 self.Send_Command("p","Partition number (1-4):")719 self.Send_Command("2","):")720 self.Send_Command("","):")721 self.Send_Command("+100M","Command (m for help):")722 self.Send_Command("n",":")723 self.Send_Command("p","Partition number (1-4):")724 self.Send_Command("3","):")725 self.Send_Command("","):")726 self.Send_Command("+2048M","Command (m for help):")727 self.Send_Command("n",":")728 self.Send_Command("p","Partition number (1-4):")729 self.Send_Command("","):")730 self.Send_Command("","Command (m for help):")731 self.Send_Command("t","Partition number (1-4):")732 self.Send_Command("3","Hex code (type L to list codes):")733 self.Send_Command("82","Command (m for help):")734 self.Send_Command("p","Command (m for help):")735 self.Send_Command("w"," #")736 self.Send_Command("fdisk -l"," #")737 self.Send_Command("mkfs.ext3 /dev/sda1"," #")738 self.Send_Command("mkfs.ext3 /dev/sda2"," #")739 self.Send_Command("mkfs.ext3 /dev/sda3"," #")740 self.Send_Command("mkfs.ext3 /dev/sda4"," #")741 self.Send_Command("mount /dev/sda4 /mnt/mmc0")742 self.Send_Command("mount /dev/sda1 /mnt/mmc1")743 self.Send_Command("df -h")...
igusd1_labview_edition.py
Source:igusd1_labview_edition.py
...128 try:129 SOCK.connect((ip_address, port))130 except:131 pass132def send_command(data):133 """134 Send a command to the device.135 Parameters136 ----------137 data : bytearray138 Bytearray to send to the device.139 Returns140 -------141 bytes142 Returned data as a bytes object.143 """144 SOCK.send(data)145 res = SOCK.recv(24)146 return res147def set_shutdown():148 """149 Tell the device to shutdown (set status to 00000000 00000110).150 Returns151 -------152 None.153 """154 send_command(get_array("shutdown"))155 ba_1 = make_bytearray(0, [96, 65], 0, 2, [33, 6])156 ba_2 = make_bytearray(0, [96, 65], 0, 2, [33, 22])157 ba_3 = make_bytearray(0, [96, 65], 0, 2, [33, 2])158 while (send_command(get_array("status")) !=159 ba_1160 and161 send_command(get_array("status")) !=162 ba_2163 and164 send_command(get_array("status")) !=165 ba_3):166 print('Waiting for shutdown...')167 time.sleep(1)168def set_switch_on():169 """170 Tell the device to switch on (set status to 00000000 00000111).171 Returns172 -------173 None.174 """175 send_command(get_array("switch_on"))176 while (send_command(get_array("status")) !=177 make_bytearray(0, [96, 65], 0, 2, [35, 6])178 and179 send_command(get_array("status")) !=180 make_bytearray(0, [96, 65], 0, 2, [35, 22])181 and182 send_command(get_array("status")) !=183 make_bytearray(0, [96, 65], 0, 2, [35, 2])):184 print('Waiting for switch-on...')185 time.sleep(1)186def set_enable_operation():187 """188 Tell the device to enable operation (set status to 00000000 00001111).189 Returns190 -------191 None.192 """193 send_command(get_array("enable_operation"))194 while (send_command(get_array("status")) !=195 make_bytearray(0, [96, 65], 0, 2, [39, 6])196 and197 send_command(get_array("status")) !=198 make_bytearray(0, [96, 65], 0, 2, [39, 22])199 and200 send_command(get_array("status")) !=201 make_bytearray(0, [96, 65], 0, 2, [39, 2])):202 print('Waiting for enabling operation...')203 time.sleep(1)204def init():205 """206 Attempt to enable the device (only works if ???)207 Returns208 -------209 None.210 """211 set_shutdown()212 set_switch_on()213 set_enable_operation()214def set_feedrate(feedrate):215 """216 Set the feedrate to specified value.217 Parameters218 ----------219 feedrate : int220 Feedrate in steps per revolution.221 Returns222 -------223 None.224 """225 feedrate_bytes = feedrate.to_bytes(4, "little")226 send_command(make_bytearray(1, [96, 146], 1, 2, [feedrate_bytes[0],227 feedrate_bytes[1]]))228 send_command(make_bytearray(1, [96, 146], 2, 1, [1]))229def set_mode(mode):230 """231 Set the movement mode of the device.232 Parameters233 ----------234 mode : int235 Selected mode (1 [move] / 6 [home]).236 Returns237 -------238 None.239 """240 send_command(make_bytearray(1, [96, 96], 0, 1, [mode]))241 while (send_command(make_bytearray(0, [96, 97], 0, 1))242 !=243 make_bytearray(0, [96, 97], 0, 1, [mode])):244 time.sleep(1)245def set_homing(method, find_velocity, zero_velocity, acceleration):246 """247 Find reference point (home) [method doesn't work yet I guess?].248 Parameters249 ----------250 method : str251 Homing method ("LSN" [limit switch negative] / ???).252 find_velocity : int253 Velocity when searching for switch (upper limit: 50000).254 zero_velocity : int255 Velocity when zeroing away from switch after pressing it.256 acceleration : int257 Acceleration/deceleration when starting/stopping move.258 Returns259 -------260 None.261 """262 set_mode(6)263 methods = {264 "LSN": 17,265 "LSP": 18,266 "IEN": 33,267 "IEP": 34,268 "SCP": 37,269 "AAF": 255270 }271 selected_method = methods[method]272 # homing method273 send_command(make_bytearray(1, [96, 152], 1, 1, [selected_method]))274 set_feedrate(6000)275 # homing velocity â max search velocity276 find_velocity_bytes = find_velocity.to_bytes(4, "little")277 send_command(make_bytearray(1, [96, 153], 1, 2, [find_velocity_bytes[0],278 find_velocity_bytes[1]]))279 # zeroing velocity â velocity after contact280 zero_velocity_bytes = zero_velocity.to_bytes(4, "little")281 send_command(make_bytearray(1, [96, 153], 2, 2, [zero_velocity_bytes[0],282 zero_velocity_bytes[1]]))283 # homing acceleration284 acceleration_bytes = acceleration.to_bytes(4, "little")285 send_command(make_bytearray(1, [96, 154], 0, 2, [acceleration_bytes[0],286 acceleration_bytes[1]]))287 # start movement288 print(list(send_command(make_bytearray(1, [96, 64], 0, 2, [31, 0]))))289 # reset start bit290 send_command(make_bytearray(1, [96, 64], 0, 2, [15, 0]))291 while (send_command(get_array("status"))292 !=293 make_bytearray(0, [96, 65], 0, 2, [39, 22])):294 print("wait for Homing to end")295 print(list(send_command(get_array("status"))))296 time.sleep(1)297 send_command(get_array("enable_operation"))298def move(velocity,299 acceleration,300 target_position):301 """302 Move the sled to an arbitrary position within the limits of the rod.303 Parameters304 ----------305 velocity : int306 Velocity when moving.307 acceleration : int308 Acceleration/deceleration when starting/stopping move.309 target_position : int310 Target position in steps.311 Returns312 -------313 None.314 """315 set_mode(1)316 velocity_bytes = velocity.to_bytes(4, "little")317 send_command(make_bytearray(1, [96, 129], 0, 4, [velocity_bytes[0],318 velocity_bytes[1],319 velocity_bytes[2],320 velocity_bytes[3]]))321 acceleration_bytes = acceleration.to_bytes(4, "little")322 send_command(make_bytearray(1, [96, 131], 0, 4, [acceleration_bytes[0],323 acceleration_bytes[1],324 acceleration_bytes[2],325 acceleration_bytes[3]]))326 target_position_bytes = target_position.to_bytes(4, "little")327 send_command(make_bytearray(1, [96, 122], 0, 4, [target_position_bytes[0],328 target_position_bytes[1],329 target_position_bytes[2],330 target_position_bytes[3]]))331 print(list(send_command(make_bytearray(1, [96, 64], 0, 2, [31, 0]))))332 send_command(make_bytearray(1, [96, 64], 0, 2, [15, 0]))333 while (send_command(get_array("status"))334 !=335 make_bytearray(0, [96, 65], 0, 2, [39, 6])):336 time.sleep(0.1)337 status = []338 actual_position_bytes = send_command(make_bytearray(0, [96, 100], 0, 4))339 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_position_bytes)[0])340 actual_velocity_bytes = send_command(make_bytearray(0, [96, 108], 0, 4))341 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_velocity_bytes)[0])342 print(status)343 send_command(get_array("enable_operation"))344def staggered_move(velocity,345 acceleration,346 start_position,347 iterations,348 step_width,349 wait_time,350 go_back351 ):352 """353 Move the sled to a start position, then move a specified amount of times354 by a specified distance and wait for a specified interval.355 Parameters356 ----------357 velocity : int358 Velocity when moving.359 acceleration : int360 Acceleration/deceleration when starting/stopping move.361 start_position : int362 Initial position to move to.363 iterations : int364 Amount of times to increment the position by step_width.365 step_width : int366 Distance between iterations.367 wait_time : float368 Wait time between moves.369 go_back : bool370 Go back to start_position at the end? (True [yes] / False [no]).371 Returns372 -------373 None.374 """375 set_mode(1)376 velocity_bytes = velocity.to_bytes(4, "little")377 send_command(make_bytearray(1, [96, 129], 0, 4, [velocity_bytes[0],378 velocity_bytes[1],379 velocity_bytes[2],380 velocity_bytes[3]]))381 acceleration_bytes = acceleration.to_bytes(4, "little")382 send_command(make_bytearray(1, [96, 131], 0, 4, [acceleration_bytes[0],383 acceleration_bytes[1],384 acceleration_bytes[2],385 acceleration_bytes[3]]))386 move(velocity, acceleration, start_position)387 for i in range(iterations):388 move(velocity, acceleration, start_position + step_width * (i + 1))389 time.sleep(wait_time)390 if go_back is True:391 move(velocity, acceleration, start_position)392def get_status():393 """394 Receive current position and velocity.395 Returns396 -------397 status : list[2]398 Receive status list [current position, current velocity].399 """400 status = []401 actual_position_bytes = send_command(make_bytearray(0, [96, 100], 0, 4))402 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_position_bytes)[0])403 actual_velocity_bytes = send_command(make_bytearray(0, [96, 108], 0, 4))404 status.append(struct.unpack("<xxxxxxxxxxxxxxxxxxxi", actual_velocity_bytes)[0])405 return status406def close():407 """408 Closes the socket.409 Returns410 -------411 None.412 """...
epd2in13_V2.py
Source:epd2in13_V2.py
...76 epdconfig.digital_write(self.reset_pin, 0)77 epdconfig.delay_ms(100)78 epdconfig.digital_write(self.reset_pin, 1)79 epdconfig.delay_ms(100) 80 def send_command(self, command):81 epdconfig.digital_write(self.cs_pin, 1)82 epdconfig.digital_write(self.cs_pin, 0)83 epdconfig.digital_write(self.dc_pin, 0)84 epdconfig.spi_writebyte([command])85 epdconfig.digital_write(self.cs_pin, 1)86 def send_data(self, data):87 88 epdconfig.digital_write(self.cs_pin, 1)89 epdconfig.digital_write(self.cs_pin, 0)90 epdconfig.digital_write(self.dc_pin, 1)91 epdconfig.spi_writebyte([data])92 epdconfig.digital_write(self.cs_pin, 1)93 94 def ReadBusy(self):95 while(epdconfig.digital_read(self.busy_pin) == 1): # 0: idle, 1: busy96 pass97 # epdconfig.delay_ms(100) 98 def TurnOnDisplay(self):99 self.send_command(0x22)100 self.send_data(0xC7)101 self.send_command(0x20) 102 self.ReadBusy()103 104 def TurnOnDisplayPart(self):105 self.send_command(0x22)106 self.send_data(0x0c)107 self.send_command(0x20) 108 self.ReadBusy()109 110 def init(self, update):111 if (epdconfig.module_init() != 0):112 return -1113 # EPD hardware init start114 self.reset()115 if(update == self.FULL_UPDATE):116 self.ReadBusy()117 self.send_command(0x12) # soft reset118 self.ReadBusy()119 self.send_command(0x74) #set analog block control120 self.send_data(0x54)121 self.send_command(0x7E) #set digital block control122 self.send_data(0x3B)123 self.send_command(0x01) #Driver output control124 self.send_data(0xF9)125 self.send_data(0x00)126 self.send_data(0x00)127 self.send_command(0x11) #data entry mode128 self.send_data(0x01)129 self.send_command(0x44) #set Ram-X address start/end position130 self.send_data(0x00)131 self.send_data(0x0F) #0x0C-->(15+1)*8=128132 self.send_command(0x45) #set Ram-Y address start/end position133 self.send_data(0xF9) #0xF9-->(249+1)=250134 self.send_data(0x00)135 self.send_data(0x00)136 self.send_data(0x00)137 138 self.send_command(0x3C) #BorderWavefrom139 self.send_data(0x03)140 self.send_command(0x2C) #VCOM Voltage141 self.send_data(0x50) #142 self.send_command(0x03)143 self.send_data(self.lut_full_update[100])144 self.send_command(0x04) #145 self.send_data(self.lut_full_update[101])146 self.send_data(self.lut_full_update[102])147 self.send_data(self.lut_full_update[103])148 self.send_command(0x3A) #Dummy Line149 self.send_data(self.lut_full_update[105])150 self.send_command(0x3B) #Gate time151 self.send_data(self.lut_full_update[106])152 self.send_command(0x32)153 for count in range(100):154 self.send_data(self.lut_full_update[count])155 self.send_command(0x4E) # set RAM x address count to 0156 self.send_data(0x00)157 self.send_command(0x4F) # set RAM y address count to 0X127158 self.send_data(0xF9)159 self.send_data(0x00)160 self.ReadBusy()161 else:162 # self.send_command(0x2C) #VCOM Voltage163 # self.send_data(0x26)164 self.ReadBusy()165 self.send_command(0x32)166 for count in range(100):167 self.send_data(self.lut_partial_update[count])168 self.send_command(0x37)169 self.send_data(0x00)170 self.send_data(0x00)171 self.send_data(0x00)172 self.send_data(0x00)173 self.send_data(0x00)174 self.send_data(0x40)175 self.send_data(0x00)176 177 self.send_command(0x22)178 self.send_data(0xC0)179 self.send_command(0x20)180 self.ReadBusy()181 # self.send_command(0x3C) #BorderWavefrom182 # self.send_data(0x01)183 return 0184 def getbuffer(self, image):185 if self.width%8 == 0:186 linewidth = int(self.width/8)187 else:188 linewidth = int(self.width/8) + 1189 190 buf = [0xFF] * (linewidth * self.height)191 image_monocolor = image.convert('1')192 imwidth, imheight = image_monocolor.size193 pixels = image_monocolor.load()194 195 if(imwidth == self.width and imheight == self.height):196 logging.debug("Vertical")197 for y in range(imheight):198 for x in range(imwidth): 199 if pixels[x, y] == 0:200 x = imwidth - x201 buf[int(x / 8) + y * linewidth] &= ~(0x80 >> (x % 8))202 elif(imwidth == self.height and imheight == self.width):203 logging.debug("Horizontal")204 for y in range(imheight):205 for x in range(imwidth):206 newx = y207 newy = self.height - x - 1208 if pixels[x, y] == 0:209 newy = imwidth - newy - 1210 buf[int(newx / 8) + newy*linewidth] &= ~(0x80 >> (y % 8))211 return buf 212 213 214 def display(self, image):215 if self.width%8 == 0:216 linewidth = int(self.width/8)217 else:218 linewidth = int(self.width/8) + 1219 self.send_command(0x24)220 for j in range(0, self.height):221 for i in range(0, linewidth):222 self.send_data(image[i + j * linewidth]) 223 self.TurnOnDisplay()224 225 def displayPartial(self, image):226 if self.width%8 == 0:227 linewidth = int(self.width/8)228 else:229 linewidth = int(self.width/8) + 1230 self.send_command(0x24)231 for j in range(0, self.height):232 for i in range(0, linewidth):233 self.send_data(image[i + j * linewidth]) 234 235 236 # self.send_command(0x26)237 # for j in range(0, self.height):238 # for i in range(0, linewidth):239 # self.send_data(~image[i + j * linewidth]) 240 self.TurnOnDisplayPart()241 def displayPartBaseImage(self, image):242 if self.width%8 == 0:243 linewidth = int(self.width/8)244 else:245 linewidth = int(self.width/8) + 1246 self.send_command(0x24)247 for j in range(0, self.height):248 for i in range(0, linewidth):249 self.send_data(image[i + j * linewidth]) 250 251 252 self.send_command(0x26)253 for j in range(0, self.height):254 for i in range(0, linewidth):255 self.send_data(image[i + j * linewidth]) 256 self.TurnOnDisplay()257 258 def Clear(self, color):259 if self.width%8 == 0:260 linewidth = int(self.width/8)261 else:262 linewidth = int(self.width/8) + 1263 # logging.debug(linewidth)264 265 self.send_command(0x24)266 for j in range(0, self.height):267 for i in range(0, linewidth):268 self.send_data(color) 269 self.TurnOnDisplay()270 def sleep(self):271 # self.send_command(0x22) #POWER OFF272 # self.send_data(0xC3)273 # self.send_command(0x20)274 self.send_command(0x10) #enter deep sleep275 self.send_data(0x01)276 epdconfig.delay_ms(100)277 epdconfig.module_exit()278### END OF FILE ###279# if __name__ == '__main__':280# epd_t = EPD()281# epd_t.init(epd_t.PART_UPDATE)282# a = 0xff283# for i in range(0,9):284# print(i)285# if i % 2 == 0:286# a = 0x0287# else:288# a = 0xff...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!