Best Python code snippet using nose
handler.py
Source:handler.py
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3"""4Created on Tuesday, Mar 26, 20195@author: kvasudev6@email: kvasudev@ualberta.ca7@status: full function with limitations8"""910# Import modules11import lib.fcnLibrary as fcnlib12from lib.signal import UltrasonicSignal13import matplotlib.pyplot as plt14import numpy as np151617class SignalHandler:1819 def __init__(self,ax):20 """21 # Initialize the signal handler22 # Creates and wraps a UltrasonicSignal object ('signalObj')23 # Attaches to signal axis object ('axis')24 # Automatically plots signal and peak lines25 """26 self.smoothBool = True27 self.axis = ax28 self.populateSignalHandler()29 30 def populateSignalHandler(self):31 """32 # This function adds the ultrasonic signal object from signal.py33 # Additionally, determines the time vector, smooth signal,34 # and sets plot values related to the ultrasonic signal35 """36 self.signalObj = UltrasonicSignal()37 self.signalObj.acquireSignal()38 self.signal = self.signalObj.signal39 self.smoothSignal = self.signalObj.signalSmooth40 self.timeVector = self.signalObj.timeVector41 self.setSignalPlotTitle()42 self.setSignalAxisTitles()43 self.plotLines()44 45 def setSignalPlotTitle(self):46 """ Set plot title """47 self.axis.set_title("Ultrasonic Signal")48 49 def setSignalAxisTitles(self):50 """ Set axis titles """51 self.axis.set_xlabel("Time (microseconds)")52 self.axis.set_ylabel("Signal Voltage (V)")53 54 def findSignalPeaks(self):55 """ Finds signal peaks to pass to the peak handler"""56 self.signalPeaks = fcnlib.findPeaks(57 self.signalObj.signal,self.THRESH,self.DEC)58 59 def plotLines(self):60 """ 61 # Plot the ultrasonic signal 62 # Only used on first plot, setSignalYData used after that63 """64 self.signalLine = self.axis.plot(self.timeVector,self.smoothSignal)65 self.signalLine[0].set_linewidth(0.5)66 67 def toggleSmoothing(self):68 """ 69 # Toggle smoothing of the ultrasonic signal 70 # This is accessed by the smoothing button71 """72 self.smoothBool = not self.smoothBool73 self.setSignalYData()74 75 def setSignalYData(self):76 """77 # Set the Y Data of the signal plot78 # This is called indirectly by the button79 """80 if self.smoothBool:81 self.signalLine[0].set_ydata(self.smoothSignal)82 else:83 self.signalLine[0].set_ydata(self.signal)84 85 def setAxis(self,ax):86 """ Set the handler's axis object """87 self.axis = ax88 89 def getAxis(self):90 """ Get the handler's axis object """91 return self.axis92 93 def getPlotXLim(self):94 """ Get the axis' X Limits """95 return self.axis.get_xlim()96 97 def getPlotYLim(self):98 """ Get the axis' Y Limits """99 pass100 101 def getTimeIndices(self):102 """ Get the indices of the limits of the x axis """103 [timeMin,timeMax] = self.getPlotXLim()104 timeMinIdx = fcnlib.getTimeIndex(timeMin,self.timeVector)105 timeMaxIdx = fcnlib.getTimeIndex(timeMax,self.timeVector)106 return timeMinIdx,timeMaxIdx107108class PeakHandler:109 # Class variables definition110 # Default THRESH = 0.5 and DEC = 0.2111 THRESH = 0.5112 DEC = 0.2113114 def __init__(self,pkAxes,signalHandler):115 """116 # Initializer for the peak handler117 # It needs access to the signalHandler for getting peaks118 # This plot runs automatically and starts with peaks shown119 """120 self.axis = pkAxes121 self.signalHandler = signalHandler122 self.findSignalPeaks()123 self.plotLines()124 125 def findSignalPeaks(self):126 """127 # Find the Signal peaks indices128 # THen finds the corresponding voltages129 # Also finds the signal peak times130 """131 self.peakIndices = fcnlib.findPeaks(132 self.signalHandler.smoothSignal,self.THRESH,self.DEC)133 self.peakVoltages = fcnlib.mapPeaks(134 self.signalHandler.smoothSignal,self.peakIndices)135 self.peakTimes = fcnlib.mapPeakTime(136 self.signalHandler.timeVector,self.peakIndices)137138 def plotLines(self):139 """ Plot the peaks """140 self.peakLine = self.axis.plot(self.peakTimes,self.peakVoltages,'ko')141 self.peakLine[0].set_markersize(3)142 143 def setLineVisibility(self):144 """ 145 # Toggle visibility146 # This is called by the PeakButton object in button_handler147 """148 self.peakLine[0].set_visible(149 not self.peakLine[0].get_visible())150 151 def getLineVisibility(self):152 """153 # Returns the visibility of the line with signal peaks.154 """155 return self.peakLine[0].get_visible()156 157class FTHandler:158 # Set class variables159 # Default: FT_POINT = 65536160 FT_POINT = 65536161162 def __init__(self,ftAxes,signalHandler):163 self.signalHandler = signalHandler164 self.axis = ftAxes165 self.setFTPlotTitle()166 self.setFTAxisTitles()167 self.filtBool = True168 self.padBool = True169 170 def setFTPlotTitle(self):171 """ Set axis title """172 self.axis.set_title("Fourier Transform Power Spectrum")173 174 def setFTAxisTitles(self):175 """ Set axis titles """176 self.axis.set_xlabel("Frequency (MHz)")177 self.axis.set_ylabel("Voltage (mV)") 178 179 def applyTransform(self):180 """ 181 First use acquireTimeSignal for original182 """183 self.acquireTimeSignal()184 self.filterSignal()185 self.createFreqDomain()186 self.calculateFT()187 self.plotFTLine()188 189 def acquireTimeSignal(self):190 """ Get the time signal from the signalHandler """191 [timeMinIdx,timeMaxIdx] = self.signalHandler.getTimeIndices()192 if self.signalHandler.smoothBool:193 self.signalPart = self.signalHandler.smoothSignal[194 timeMinIdx:timeMaxIdx]195 else:196 self.signalPart = self.signalHandler.signal[197 timeMinIdx:timeMaxIdx]198 199 def filterSignal(self):200 """ 201 # Filter the signal, if filtBool is True 202 # Filtering is accomplished by multiplying203 # the signal portion by a hamming window of204 # equal size.205 """206 if self.filtBool:207 signalPartSize = self.signalPart.size208 hammingWindow = np.hamming(signalPartSize)209 self.signalPart = np.multiply(hammingWindow,self.signalPart)210 211 def createFreqDomain(self):212 """213 # Create the frequency domain used for plotting214 # Depends on zero-padding215 # If there is zero-padding, the freq. domain must216 # be the size of the zero-padded signal, otherwise217 # it is only as big as the signal portion218 """219 samplingRate = fcnlib.calculateSamplingRate(220 self.signalHandler.signalObj)221 if self.padBool:222 # If signal is padded, create a large freq domain223 self.freqDomain = fcnlib.createFrequencies(224 samplingRate,self.FT_POINT)225 else:226 # If signal isn't padded, create small freq domain227 self.freqDomain = fcnlib.createFrequencies(228 samplingRate,self.signalPart.size)229 230 def calculateFT(self):231 """232 # Calculate the N-point fourier transform.233 # Where N-point is either the size of FT_POINT234 # or the length of the signal portion from signal handler.235 # Specificially calculate the power spectrum.236 # Calculation is accomplished in functionLibrary.237 """238 if self.padBool:239 # If zero-padding is used240 self.powerSpectrum = fcnlib.calculateFT(241 self.signalPart,self.FT_POINT)242 else:243 # If zero padding isn't used244 ftPoint = int(self.signalPart.size)245 self.powerSpectrum = fcnlib.calculateFT(246 self.signalPart,ftPoint)247 248 def plotFTLine(self):249 """ Plot the Fourier Transform """250 self.ftLine = self.axis.plot(self.freqDomain,self.powerSpectrum)251 self.ftLine[0].set_linewidth(0.5)252 253 def setFilterBool(self,value):254 """ Set the filtering boolean value """255 self.filtBool = value256 257 def getFilterBool(self):258 """ Get the filtering boolean value """259 return self.filtBool260 261 def toggleFilter(self):262 """ Toggle whether filtering is applied """263 self.setFilterBool(not self.getFilterBool())264 265 def setPadBool(self,value):266 """ Set whether padding is used (boolean) """267 self.padBool = value268 269 def getPadBool(self):270 """ Get the boolean value of padding """271 return self.padBool272 273 def togglePadding(self):274 """ Toggle whether padding is applied """275 self.setPadBool(not self.getPadBool())276 277"""278In this module:279(Still requires some additions)280281Classes:282 SignalHandler283 Methods:284 -add-285 Properties:286 signalObj287 signal288 smoothSignal289 timeVector290 axis291 signalLine292 PeakHandler293 Methods:294 -add-295 Properties:296 THRESH (class var)297 DEC (class var)298 axis299 signalHandler300 peakIndices301 peakVoltages302 peakTimes303 peakLine304 FTHandler305 Methods:306 -add-307 Properties:308 FILTER_SIZE (class var)309 FT_POINT (class var)310 axis311 signalHandler312 signalPart313 transform314 powerSpectrum315
...
radiometer.py
Source:radiometer.py
1"""2module provides class for multi-threaded simultaneous power meter reading3"""4import time5import datetime6import logging7import numpy as N8import os9import threading10import signal11from math import log1012module_logger = logging.getLogger(__name__)13from Electronics.Instruments import DeviceReadThread14from local_dirs import log_dir15from support import NamedClass, sync_second16data_path = log_dir+"/Radiometer/"17class Radiometer(NamedClass):18 """19 Class for reading multiple power meters synchronous20 21 The class initiates a system signal with a handler imaginatively called22 'signalHandler()'. An event called 'take_data' which is initially cleared.23 It then starts a reader thread for every power meter. Each reader has two24 associated events, 'reader_started' and 'reader_done'. Both are initially25 False. A method 'action()' is provided to be invoked by the reader threads.26 When the Radiometer is started, a system timer is intialized with a handler27 called 'update_interval()'. When the timer fires, 'signalHandler()'28 does nothing if 'take_data' is set. If it is not, then it waits until all29 the readers have completed their 'action()'. It stays in this wait loop30 using time.sleep(). When all the readers are done, it formats an output31 line with the mean time of all the reading times and the readings and writes32 the line to the datafile, flushing the buffer afterwards.33 34 Public Attributes::35 integration - 2*update_interval for Nyquist sampling36 last_reading - results of the last power meter reading37 logger - logging.Logger object38 pm_reader - DeviceReadThread object39 reader_done - threading.Event object, set when reading has been taken40 reader_started - threading.Event object, set when a reading is started41 run - True if the radiometer is running42 take_data - threading.Event object to signal readers to take reading43 update_interval - inverse of reading rate44 """ 45 def __init__(self, PM, rate=1./60):46 """47 Create a synchronized multi-channel power meter48 49 If the radiometer is started without a rate being given then the default is50 once per minute.51 @param PM : dict of power meters52 @type PM : dict of PowerMeter sub-class objects53 54 @param rate : number of readings/sec55 @type rate : float56 """57 self.logger = logging.getLogger(module_logger.name+".Radiometer")58 self.set_rate(rate)59 # create a timer and timer event handler60 signal.signal(signal.SIGALRM, self.signalHandler)61 self.logger.debug("__init__: signal handler assigned")62 self.take_data = threading.Event()63 self.take_data.clear()64 self.logger.debug("__init__: 'take_data' event created and cleared")65 # set power meter averaging66 # still needs to be done67 # assign reader threads68 self.pm_reader = {}69 self.reader_started = {}70 self.reader_done = {}71 self.last_reading = {}72 for key in list(PM.keys()):73 PM[key].name = key74 # initial reading to wake up Radipower75 self.last_reading[key] = PM[key].power() 76 self.pm_reader[key] = DeviceReadThread(self, PM[key])77 self.logger.debug("__init__: reader and queue %s created", key)78 self.pm_reader[key].daemon = True79 # See ~/Python/Thread/daemon.py before uncommenting this code80 #if self.pm_reader[key].isAlive():81 #self.pm_reader[key].join(1) # this blocks the main thread for one sec82 self.reader_started[key] = threading.Event()83 self.reader_started[key].clear()84 self.reader_done[key] = threading.Event()85 self.reader_done[key].clear()86 self.logger.debug("__init__: 'reader done' event %s created and cleared",87 key)88 self.logger.debug(" initialized")89 def set_rate(self, rate):90 """91 """92 # set the sampling rate and integration time to Nyquist93 self.update_interval = 1./rate # sec94 self.logger.debug("__init__: interval is %f", self.update_interval)95 self.integration = 2*self.update_interval # Nyquist sampling96 97 def start(self):98 """99 Starts the signaller and the threads100 """101 for key in list(self.pm_reader.keys()):102 self.pm_reader[key].start()103 sync_second()104 signal.setitimer(signal.ITIMER_REAL, self.update_interval, self.update_interval)105 self.logger.debug("start: timer started with %f s interval", self.update_interval)106 107 def signalHandler(self, *args):108 """109 Actions to take when the timer goes off::110 1. Ignore signal if take_data is set111 2. Set take_data112 3. Waits until all readers have started.113 4. Clears take_data.114 5. Waits until all readers have finished.115 It also traps a keyboard interrupt and closes the radiometer116 """117 def check_reader_status(flag, name):118 """119 Checks remaining readers to see if they have finished120 """121 reader_list = list(flag.keys())122 while len(reader_list):123 try:124 for key in reader_list:125 time.sleep(0.001)126 if flag[key].is_set():127 self.logger.debug("signalHandler: %s %d is set", name, key)128 reader_list.remove(key)129 else:130 self.logger.debug("signalHandler: %s %d is not set", name, key)131 except KeyboardInterrupt:132 self.logger.warning("signalHandler.check_reader_status: interrupted")133 self.close()134 return True135 136 def format_time(t):137 isec,fsec = divmod(t,1)138 timestr = time.strftime("%j %H%M%S", time.gmtime(t))139 secfmt = "%."+str(max(0,-int(round(log10(self.update_interval)))+1))+"f"140 self.logger.debug("action.format_time: seconds format: %s", secfmt)141 secstr = (secfmt % fsec)[1:]142 self.logger.debug("action.format_time: seconds str: %s", secstr)143 return timestr+secstr144 145 if self.take_data.is_set():146 self.logger.warning("signalHandler is busy and skipped")147 else:148 self.logger.debug("signalHandler: called")149 try:150 self.logger.debug("signalHandler: setting take_data")151 self.take_data.set() # OK to take data152 # wait until all the readers have started153 if check_reader_status(self.reader_started, "reader_started is set"):154 self.take_data.clear()155 self.logger.debug("signalHandler: take_data is cleared")156 # wait until all the readers are done157 if check_reader_status(self.reader_done, "reader_done"):158 self.logger.debug("signalHandler: all readers done")159 except KeyboardInterrupt:160 self.logger.warning("signalHandler: interrupted")161 self.close()162 except Exception as details:163 self.logger.warning("signalHandler: %s", details)164 # output data array165 ar = N.array(list(self.last_reading.values()))166 t = ar[:,0].mean()167 powers = tuple(ar[:,1])168 try:169 outstr = format_time(t) + (8*" %6.2f" % powers)170 except TypeError as details:171 self.logger.warning("signalHandler: conversion failed: %s", details)172 outstr = format_time(t) + ("%s" % powers)173 try:174 self.datafile.write(outstr+"\n")175 self.datafile.flush()176 except Exception:177 pass178 179 #def running(self):180 # return self.run181 182 def action(self, pm):183 """184 Action performed by thread for power meter185 Actions invoked by the DeviceReadThread object::186 1. Wait until take_data is set.187 2. Sets reader_started event.188 3. Takes a power meter reading.189 4. Clears reader_started event.190 5. Saves reading in last_reading.191 6. Sets reader_done.192 @param pm : power meter193 @type pm : any instance of a PowerMeter class194 """195 try:196 self.logger.debug("action: %s waiting for signal", pm.name)197 self.take_data.wait()198 self.reader_started[pm.name].set()199 self.logger.debug("action: %s reader started", pm.name)200 reading = pm.power()201 self.logger.debug("action: reading %s completed", pm.name)202 self.reader_started[pm.name].clear()203 except KeyboardInterrupt:204 self.close()205 t = time.time()206 self.last_reading[pm.name] = (t, reading)207 self.reader_done[pm.name].set()208 209 def get_readings(self):210 """211 Returns results of ongoing or just completed reading.212 """213 while self.take_data.is_set():214 time.sleep(0.001)215 return self.last_reading216 217 def close(self):218 """219 Terminates the signaller, RTC and the power meter reading threads220 """221 signal.setitimer(signal.ITIMER_REAL, 0)222 self.logger.debug("close: stopping")223 self.take_data.clear()224 for key in list(self.pm_reader.keys()):225 self.pm_reader[key].terminate()226 self.logger.debug("close: reader %d terminated", key)...
test_EQTicketModel.py
Source:test_EQTicketModel.py
1# -*- coding: utf-8 -*-2# ############################################################################3##4## Copyright (C) 2014 Vista Software. All rights reserved.5##6#############################################################################7__author__ = 'atronah'8'''9 author: atronah10 date: 19.10.201411'''12import unittest13import sys14from PyQt4 import QtCore, QtSql15from EQTicketModel import CEQTicketControlModel16def initTestDatabase(connectionName = 'unittest'):17 hostName = '192.168.0.3'18 port = 330619 databaseName = 'b15'20 userName = 'dbuser'21 userPassword = 'dbpassword'22 db = QtSql.QSqlDatabase.addDatabase('QMYSQL', connectionName)23 db.setHostName(hostName)24 db.setPort(port)25 db.setDatabaseName(databaseName)26 db.setUserName(userName)27 db.setPassword(userPassword)28 db.open()29 return db30class TestEqTicketModel(unittest.TestCase):31 app = None32 queueTypeId = 133 def setUp(self):34 self.app = QtCore.QCoreApplication(sys.argv)35 initTestDatabase()36 def test_GeneralTest(self):37 class SignalHandler(QtCore.QObject):38 def __init__(self, model, parent = None):39 super(SignalHandler, self).__init__(parent)40 self._model = model41 model.prevValueChanged.connect(self.onPrevValueChanged)42 model.currentValueChanged.connect(self.onCurrentValueChanged)43 model.nextValueChanged.connect(self.onNextValueChanged)44 model.queueChanged.connect(self.onQueueChanged)45 self.prev = QtCore.QString('')46 self.curr = QtCore.QString('')47 self.next = QtCore.QString('')48 def printState(self, msg):49 pass50 @QtCore.pyqtSlot(int, QtCore.QString)51 def onPrevValueChanged(self, queueTypeId, value):52 self.prev = value53 self.printState('prev changed: ')54 @QtCore.pyqtSlot(int, QtCore.QString)55 def onCurrentValueChanged(self, queueTypeId, value):56 self.curr = value57 self.printState('curr changed: ')58 @QtCore.pyqtSlot(int, QtCore.QString)59 def onNextValueChanged(self, queueTypeId, value):60 self.next = value61 self.printState('next changed: ')62 @QtCore.pyqtSlot(int)63 def onQueueChanged(self, queueTypeId):64 self.printState('>> queue changed: ')65 db = QtSql.QSqlDatabase.database('unittest')66 if not db.isOpen():67 db.open()68 prepareQuery = QtSql.QSqlQuery(QtCore.QString("""69 DELETE FROM `EQueueTicket` WHERE queue_id = %s;70 INSERT INTO `EQueueTicket` (`queue_id`, `status`, `idx`, `value`) VALUES ('1', '0', '0', '0-reserved');71 INSERT INTO `EQueueTicket` (`queue_id`, `status`, `idx`, `value`) VALUES ('1', '1', '1', '1');72 INSERT INTO `EQueueTicket` (`queue_id`, `status`, `idx`, `value`) VALUES ('1', '1', '2', '2');73 INSERT INTO `EQueueTicket` (`queue_id`, `status`, `idx`, `value`) VALUES ('1', '1', '3', '3');74 INSERT INTO `EQueueTicket` (`queue_id`, `status`, `idx`, `value`) VALUES ('1', '4', '4', '4-completed');75 INSERT INTO `EQueueTicket` (`queue_id`, `status`, `idx`, `value`) VALUES ('1', '1', '5', '5');76 """ % self.queueTypeId),77 db)78 self.assertFalse(prepareQuery.lastError().isValid(), 'prepareQuery lastError')79 del prepareQuery #TODO: atronah: еÑли ÑÑÑ ÑÑÑÐ¾ÐºÑ ÑбÑаÑÑ, Ñо бÑÐ´ÐµÑ Ð¼Ð°Ð³Ð¸Ñ Ñ "unable to find table `EQueueTicket`"80 model = CEQTicketControlModel(self.queueTypeId, db)81 signalHandler = SignalHandler(model)82 model.select()83 self.assertEqual(signalHandler.prev, QtCore.QString(''))84 self.assertEqual(signalHandler.curr, QtCore.QString(''))85 self.assertEqual(signalHandler.next, QtCore.QString('1'))86 model.moveToNext()87 self.assertEqual(signalHandler.prev, QtCore.QString(''))88 self.assertEqual(signalHandler.curr, QtCore.QString('1'))89 self.assertEqual(signalHandler.next, QtCore.QString('2'))90 model.moveToNext()91 self.assertEqual(signalHandler.prev, QtCore.QString('1'))92 self.assertEqual(signalHandler.curr, QtCore.QString('2'))93 self.assertEqual(signalHandler.next, QtCore.QString('3'))94 model.moveToNext()95 self.assertEqual(signalHandler.prev, QtCore.QString('2'))96 self.assertEqual(signalHandler.curr, QtCore.QString('3'))97 self.assertEqual(signalHandler.next, QtCore.QString('5'))98 model.moveToNext()99 self.assertEqual(signalHandler.prev, QtCore.QString('3'))100 self.assertEqual(signalHandler.curr, QtCore.QString('5'))101 self.assertEqual(signalHandler.next, QtCore.QString(''))102 model.moveToPrev()103 self.assertEqual(signalHandler.prev, QtCore.QString('2'))104 self.assertEqual(signalHandler.curr, QtCore.QString('3'))105 self.assertEqual(signalHandler.next, QtCore.QString(''))106 model.moveToPrev()107 self.assertEqual(signalHandler.prev, QtCore.QString('1'))108 self.assertEqual(signalHandler.curr, QtCore.QString('2'))109 self.assertEqual(signalHandler.next, QtCore.QString(''))110 model.moveToPrev()111 self.assertEqual(signalHandler.prev, QtCore.QString(''))112 self.assertEqual(signalHandler.curr, QtCore.QString('1'))113 self.assertEqual(signalHandler.next, QtCore.QString(''))114 # def tearDown(self)115def main():116 unittest.main()117if __name__ == '__main__':...
bridge.py
Source:bridge.py
1# -*- coding:utf-8 -*-23import tornado.ioloop4import tornado.web5import tornado.httpserver6import tornado.options as opts7import tornado.log as log8import tornado.iostream910import socket11import copy1213from desc_table import desc_t1415class SignalStore:16 #~ streamè¿æ¥äºä»¶17 #~ [ func,... ]18 on_connect = []19 on_close = []20 #~ zigbeeäºä»¶, y,n,trr äºä»¶å¨sendå½æ°ä¸åè°21 #~ [ func,... ]22 on_o = [] # æ§å¶ç¾¤ä¸çº¿23 on_r = [] # 设å¤æ³¨å24 on_td = [] # ç´æ¥æ¶æ¯25 on_tr = [] # 请æ±26 on_trr = [] # trr äºä»¶å¨ basic & send ä¸é½æå®ä¹27 on_q = [] # 设å¤éåº28 on_l = [] # æ§å¶ç¾¤ä¸çº¿29 @classmethod30 def add(cls,event,callback):31 store = getattr(cls,'on_'+event)32 if not callback in store:33 store.append(callback)34 @classmethod35 def remove(cls,event,callback):36 store = getattr(cls,'on_'+event)37 if callback in store:38 store.pop( store.index(callback) )3940class SendCallbackStore:41 #~ ç¹æ®å¤ç42 #~ { (zaddr,did):[func,...] }43 on_y = {}44 on_n = {}45 on_trr = {}46 @classmethod47 def add(cls,event,zaddr,did,callback):48 store = getattr(cls,'on_'+event)49 if store.has_key((zaddr,did)):50 this = store[(zaddr,did)]51 if not callback in this:52 this.append(callback)53 else:54 store[(zaddr,did)] = [callback]55 @classmethod56 def remove(cls,event,zaddr,did,callback):57 store = getattr(cls,'on_'+event)58 if store.has_key((zaddr,did)):59 this = store[(zaddr,did)]60 if callback in this:61 this.pop( this.index(callback) )6263class SignalHandler:64 def on_basic(self,event,desc=None):65 fs = [] + getattr(SignalStore,'on_'+event)66 while len(fs):67 if desc: fs.pop(0)(desc)68 else: fs.pop(0)()69 70 def on_close(self):71 self.on_basic('close')72 73 def on_connect(self):74 self.on_basic('connect')75 76 def on_o(self,desc):77 self.on_basic('o',desc)7879 def on_r(self,desc):80 self.on_basic('r',desc)8182 def on_td(self,desc):83 self.on_basic('td',desc)8485 def on_tr(self,desc):86 self.on_basic('tr',desc)87 88 def on_trr(self,desc):89 self.on_basic('trr',desc)90 91 def on_q(self,desc):92 self.on_basic('q',desc)9394 def on_l(self,desc):95 self.on_basic('l',desc)96 97class SendCallbackHandler:98 def on_basic_send(self,event,desc):99 zaddr = desc['zaddr']100 did = desc['did']101 fs = [] + getattr(SendCallbackStore,'on_'+event)[(zaddr,did)]102 getattr(store,'on_'+event)[(zaddr,did)] = []103 while len(fs):104 if desc: fs.pop(0)(desc)105 else: fs.pop(0)()106 107 def on_y(self,desc):108 self.on_basic_send('y',desc)109110 def on_n(self,desc):111 self.on_basic_send('n',desc)112113 def on_trr(self,desc):114 self.on_basic_send('trr',desc)115116class StreamHandler:117 EndStr = '$$$'118 def __init__(self):119 #~ 120 self.SignalHandler = SignalHandler()121 self.SendCallbackHandler = SendCallbackHandler()122 #~ 123 self.connect()124 125 def connect(self):126 self.stream = tornado.iostream.IOStream( socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) )127 self.stream.connect(('localhost',opts.options.bridge_SenPort),self.on_connect)128 129 def on_connect(self):130 self.stream.read_until(StreamHandler.EndStr,self.on_stream_read)131 self.stream.set_close_callback(self.on_close)132 tornado.log.gen_log.info('bridge è¿æ¥æå')133 self.SignalHandler.on_connect()134 135 def on_close(self):136 tornado.log.gen_log.warn('bridge è¿æ¥æå¼')137 self.SignalHandler.on_close()138 tornado.log.gen_log.warn('bridge å°è¯éè¿')139 self.connect()140 141 def on_stream_read(self,rdat):142 tornado.log.gen_log.info('bridge æ¶å°æ°æ® %s' %(rdat,))143 desc = desc_t()144 desc.load(rdat.rstrip(StreamHandler.EndStr))145 e = desc['e']146 if e=='o': self.SignalHandler.on_o(desc)147 elif e=='r': self.SignalHandler.on_r(desc)148 elif e=='td': self.SignalHandler.on_td(desc)149 elif e=='tr': self.SignalHandler.on_tr(desc)150 elif e=='trr': self.SignalHandler.on_trr(desc)151 elif e=='q': self.SignalHandler.on_q(desc)152 elif e=='l': self.SignalHandler.on_l(desc)153 elif e=='y': self.SignalHandler.on_y(desc)154 elif e=='n': self.SignalHandler.on_n(desc)155 #~ 156 self.stream.read_until(StreamHandler.EndStr,self.on_stream_read)157 158 def send(self,desc,on_y=None,on_n=None,on_trr=None):159 'æ°æ®ç´æ¥ä¸å'160 store = SendCallbackStore161 #~ è£
å
¥ send callback store162 e = desc['e']163 if e in ('td','tr'):164 zaddr = desc['zaddr']165 did = desc['did']166 if on_y: store.add('y',zaddr,did,on_y)167 if on_n: store.add('n',zaddr,did,on_n)168 if on_trr: store.add('trr',zaddr,did,on_trr)169 tornado.log.gen_log.warn('bridge å°è¯ä¸è½½æ令 %s' %(desc,))170 self.stream.write(str(desc), lambda : tornado.log.gen_log.info('bridge ä¸è½½[OK] %s' %(desc,)))171172#~ å
¨å±æ¹æ³173def add_listener(event,callback):174 SignalStore.add(event,callback)175176def remove_listener(event,callback):177 SignalStore.remove(event,callback)178179stream = StreamHandler()180181def send(desc,on_y=None,on_n=None,on_trr=None):
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!