Best Python code snippet using hypothesis
ab_service.py
Source:ab_service.py
1# -*- coding: utf-8 -*-2from __future__ import division3import copy4from collections import namedtuple5from datetime import datetime6from struct import unpack7from stars.apps.customer.finance.ab import ab_util, utils8from stars.apps.customer.finance.ab.common_const import AgriculturalBankTradeConstData9class Handler(object):10 def handle(self):11 pass12class ErrorHandler(Handler):13 def handle(self):14 pass15class SucHandler(Handler):16 def handle(self):17 pass18class AbService(object):19 head_msg_type = 'R'20 trade_no = ''21 msg_template = ''22 trade_source = ''23 __mch_id = AgriculturalBankTradeConstData.INST_NO24 Header = namedtuple('Header', 'api_type ver msg_size head_msg_type trade_no mch_id encryption_flag mac_flag mac reserve')25 __header_fmt = '!1s3s5s1s5s8s1s1s8s8s'26 __key_fields_set = {'BankPassword', 'TradePassword', 'FundPassword', 'InstOprPwd'}27 must_exist_item = set()28 def __init__(self, suc_handler=None, error_handler=None):29 self.suc_handler = suc_handler30 self.error_handler = error_handler31 # def get_msg_template(self):32 # return msg_template33 @classmethod34 def create_ab_inst_serial(cls):35 return ab_util.create_ab_inst_serial(ab_util.get_service_name_for_ab(cls.trade_no))36 def get_trade_no(self):37 return self.trade_no38 def send_and_receive(self, data):39 raise NotImplemented40 def send(self, data):41 raise NotImplemented42 def receive(self, size):43 raise NotImplemented44 def get_mch_id(self):45 return self.__class__.__mch_id46 def get_key(self):47 return ab_util.get_key()48 def get_date(self):49 return datetime.today().strftime('%Y%m%d')50 def get_time(self):51 return datetime.today().strftime('%H%M%S')52 @staticmethod53 def get_header_from_text(text):54 a=AbService.Header._make(unpack(AbService.__header_fmt, text))55 return a._replace(msg_size=int(a.msg_size))56 def split_header_msg(self, buf):57 if isinstance(buf, list):58 buf = ''.join(buf)59 s = buf[:41]60 header = self.get_header_from_text(s)61 msg_size = header.msg_size62 cr_msg = buf[41: 41+msg_size]63 return header, cr_msg64 def decrypt_recv_msg_body(self, cr_msg, buf_size, pack_key, pin_key, mac_key):65 # with open('b.txt', 'wb') as w:66 # w.write(cr_msg)67 msg = self.decrypt_msg(cr_msg, pack_key, pin_key)68 # TODO for test69 with open('received_msg.txt', 'wb') as w:70 w.write(msg)71 # b=chardet.detect(msg)72 # print(b)73 data = utils.make_dict_from_xml(msg)74 return data75 def decrypt_msg(self, data, pack_key, pin_key):76 try:77 msg = ab_util.ab_decrypt_msg(data, pack_key)78 # å
³é®å79 key_fields = self.__class__.__key_fields_set.intersection(set(data.keys()))80 if key_fields:81 data = copy.deepcopy(data)82 for ele in key_fields:83 if data[ele]:84 data[ele] = ab_util.ab_decrypt_password(data[ele], pin_key)85 return msg86 except Exception as e:87 print(e)88 def check_msg_text(self, data, pack_key, pin_key, mac_key):89 s = self.check_msg_text_for_common(data, pack_key, pin_key, mac_key)90 return s91 def make_data(self, data, pack_key, pin_key, mac_key, msg_template=None):92 return self.__make_data(data, pack_key, pin_key, mac_key, msg_template)93 def get_code(self, s):94 return '17{}'.format(s)95 def check_mac(self, text, mac, mac_key, pack_key, pin_key):96 # ä¸æ£æ¥97 # if not ab_util.check_mac(text, mac, mac_key):98 # code = self.get_code('2043')99 # m = {'InstructionCode': self.__class__.trade_no, # 交æå·100 # 'TradeSource': 'B', # 交æåèµ·æ¹ï¼å¸åº101 # 'InstNo': self.get_mch_id(), # å¸åºç¼å·102 # 'Date': self.get_date(),103 # 'Time': self.get_time(),104 # 'Code': code,105 # }106 # # t = self.make_data(m, pack_key, pin_key, mac_key)107 # t = m108 # return t109 return None110 def check_msg_text_for_common(self, data, pack_key, pin_key, mac_key):111 # 解æé误112 m = None113 if not data:114 code = self.get_code('2044') # é讯æ¶æ¯ä½æ ¼å¼é误115 m = {'InstructionCode': self.__class__.trade_no, # 交æå·116 'TradeSource': self.trade_source, # 交æåèµ·æ¹117 'InstNo': self.get_mch_id(), # å¸åºç¼å·118 'Date': self.get_date(),119 'Time': self.get_time(),120 'Code': code,121 'Info': u'é讯æ¶æ¯ä½æ ¼å¼é误:'122 }123 # 缺å°å¿
须项124 elif self.must_exist_item and not self.must_exist_item.issubset(set(data.keys())):125 code = self.get_code('2044')126 m = {'InstructionCode': self.__class__.trade_no, # 交æå·127 'TradeSource': self.trade_source, # 交æåèµ·æ¹128 'InstNo': self.get_mch_id(), # å¸åºç¼å·129 'Date': self.get_date(),130 'Time': self.get_time(),131 'Code': code,132 'Info': u'缺å°å¿
须项:' + ','.join(self.must_exist_item - set(data.keys()))133 }134 elif data.get('InstructionCode') != self.trade_no:135 code = '2031' # 交æç é136 m = {'InstructionCode': self.__class__.trade_no, # 交æå·137 'TradeSource': self.trade_source, # 交æåèµ·æ¹138 'InstNo': self.get_mch_id(), # å¸åºç¼å·139 'Date': self.get_date(),140 'Time': self.get_time(),141 'Code': code,142 'Info': u'交æç é:'143 }144 elif not utils.is_none_or_empty_or_blank(data.get('InstNo')) and data.get('InstNo') != self.get_mch_id():145 code = '2031' # å¸åºç¼ç é146 m = {'InstructionCode': self.__class__.trade_no, # 交æå·147 'TradeSource': self.trade_source, # 交æåèµ·æ¹148 'InstNo': self.get_mch_id(), # å¸åºç¼å·149 'Date': self.get_date(),150 'Time': self.get_time(),151 'Code': code,152 'Info': u'å¸åºç¼ç é:'153 }154 if m:155 t = self.make_data(m, pack_key, pin_key, mac_key)156 return t157 return None158 def __make_header(self, msg_size, mac):159 return ab_util.make_header(msg_size, mac=mac,160 msg_type=self.__class__.head_msg_type,161 trade_no=self.__class__.trade_no,162 mch_id=self.__class__.__mch_id)163 def __make_enc_msg(self, data, pack_key, pin_key=None, msg_template=None ):164 key_fields = self.__class__.__key_fields_set.intersection(set(data.keys()))165 if key_fields:166 data = copy.deepcopy(data)167 for ele in key_fields:168 if data[ele]:169 data[ele] = ab_util.ab_encrypt_password(data[ele], pin_key)170 if not msg_template:171 s = self.__class__.msg_template.decode('utf8').format(**data)172 else:173 s = msg_template.decode('utf8').format(**data)174 s = s.encode('gbk')175 # s = s.encode('utf8')176 with open('r_msg.txt', 'wb') as w:177 w.write(s)178 # w.write(pack_key)179 # w.write(pin_key)180 # logging.exception('test by lwj 11111111111111111')181 # logging.exception('pa_k :'+pack_key)182 # logging.exception('pi_k :'+pin_key)183 c_msg = ab_util.ab_encrypt_msg(s, pack_key)184 return c_msg185 def __make_data(self, data, pack_key, pin_key, mac_key, msg_template=None ):186 c_msg = self.__make_enc_msg(data, pack_key, pin_key, msg_template)187 mac = ab_util.make_mac(c_msg, mac_key)188 header = self.__make_header(len(c_msg), mac)189 return header + c_msg190 def __check_header_msg(self, header, recv_msg, mac_key):191 mac = header.mac192 if not ab_util.check_mac(recv_msg, mac, mac_key):193 pass # TODO194 def do_event(self, data, keys=None):...
rbpi_controller.py
Source:rbpi_controller.py
1'''Created on 2018/11/172 last update 2018/12/073@author: Taylor W Hickem4'''5import sys6import RPi.GPIO as GPIO7import datetime as dt8import logger9LOGGER_STATUS = True #log results to csv file Y/N?10#define the settings11light_settings = {}12pump_settings = {}13light_settings['start_hrs'] = 614light_settings['lights-T'] = {'OFF hrs':8.00}15light_settings['lights-M'] = None16light_settings['lights-L'] = {'OFF hrs':8.00}17pump_settings['pumpA'] = {'ON sec':30,'OFF sec':900,'last_update':None,'last_value':None}18pump_settings['pumpB'] = {'ON sec':60,'OFF sec':600,'last_update':None,'last_value':None}19#define the GPIO pin arrangement20pins = {'pumpA':17,'pumpB':5,'lights-M':25,'lights-L':23,'lights-T':24}21#define the default value settings22# the pumps are configured as NC and the relay is inverted23# so a 1 corresponds to the OFF position24# for the lights, they are configured as NO so a 1 corresponds to ON25ON_values = {'pumpA':0,'pumpB':0,'lights-M':1,'lights-L':1,'lights-T':1}26defaultValues = {'pumpA':False,'pumpB':False,'lights-T':True,'lights-M':False,'lights-L':True}27def setupPins():28 for pin in pins:29 GPIO.setup(pins[pin],GPIO.OUT)30def pinIsON(pin_key):31 pinId = pins[pin_key]32 pinONValue = ON_values[pin_key]33 GPIOValue = GPIO.input(pinId)34 isON = (GPIOValue==pinONValue)35 return isON36def setPinValue(pin_key,isON):37 pinId = pins[pin_key]38 pinONValue = ON_values[pin_key]39 if isON:40 outputValue = pinONValue41 else:42 outputValue = 1-pinONValue43 GPIO.output(pinId,outputValue)44 45def set_defaultValues():46 for pin in pins:47 setPinValue(pin,defaultValues[pin])48 if LOGGER_STATUS:49 logger.writelog_controller_action('set default value'50 ,defaultValues[pin]*1,'set default value for %s' % pin)51 52def updateLightStatus(light_key,nowTime):53 settings = light_settings[light_key]54 if not settings is None:55 start_hrs = light_settings['start_hrs']56 offHrs = settings['OFF hrs']57 onHrs = 24 - offHrs58 isON = pinIsON(light_key)59 if start_hrs+onHrs < 24: # OFF time is within same day60 sameDay = True61 end_hrs = start_hrs + onHrs62 else: #OFF time is the next day63 sameDay = False64 end_hrs = onHrs - (24-start_hrs)65 now_hrs = nowTime.hour + float(nowTime.minute)/6066 if sameDay:67 elapsed = [now_hrs-start_hrs,now_hrs-end_hrs]68 if elapsed[0]<0 and elapsed[1]<0: # OFF69 target_state = False70 elif elapsed[0]>=0 and elapsed[1]<0: # ON71 target_state = True72 else: # OFF73 target_state = False74 else:75 elapsed = [now_hrs-end_hrs,now_hrs-start_hrs]76 if elapsed[0]<0 and elapsed[1]<0: # ON77 target_state = True78 elif elapsed[0]>=0 and elapsed[1]<0: # OFF79 target_state = False80 else: # ON81 target_state = True82 if not (isON == target_state):83 setPinValue(light_key,target_state)84 if LOGGER_STATUS:85 logger.writelog_controller_action('change light status'86 ,target_state*1,'changed light status for %s' % light_key)87 88def updatePumpStatus(pump_key,nowTime):89 90 def changePumpStatus(isON):91 setPinValue(pump_key,isON)92 pump_settings[pump_key]['last_update'] = nowTime93 pump_settings[pump_key]['last_value'] = isON94 if LOGGER_STATUS:95 logger.writelog_controller_action('change pump status'96 ,isON*1,'changed pump status for %s' % pump_key)97 settings = pump_settings[pump_key]98 last_update = settings['last_update']99 last_value = settings['last_value']100 isON = pinIsON(pump_key)101 if last_update is None:102 last_update = nowTime103 last_value = isON104 pump_settings[pump_key]['last_update'] = last_update105 pump_settings[pump_key]['last_value'] = last_value106 ONOFF_sec = (settings['ON sec'],settings['OFF sec'])107 elapsed = nowTime - last_update108 if last_value: #ON109 if elapsed.seconds>ONOFF_sec[0]:110 if isON:111 changePumpStatus(False)112 else:113 if not isON:114 changePumpStatus(True)115 else: #OFF116 if elapsed.seconds>ONOFF_sec[1]:117 if not isON:118 changePumpStatus(True)119 else:120 if isON:121 changePumpStatus(False)122def updateLightsStatus(nowTime):123 for light_key in [x for x in light_settings if not x == 'start_hrs']:124 updateLightStatus(light_key,nowTime)125def updatePumpsStatus(nowTime):126 for pump_key in pump_settings:127 updatePumpStatus(pump_key,nowTime)128 129def run_process():130 while True:131 nowTime = dt.datetime.now()132 updateLightsStatus(nowTime)133 updatePumpsStatus(nowTime)134def main():135 # set the mode136 GPIO.setmode(GPIO.BCM)137 # setup the pins138 setupPins()139 140 # set pins to default values141 set_defaultValues()142 143 # run the process144 run_process()145if __name__ == "__main__":146 try:147 main()148 except BaseException as e:149 print('shutting down for exception %s' % e)150 print(e.message)151 if LOGGER_STATUS:152 logger.writelog_exception(153 str(e.message),154 value=str(e.args),155 exception_type=type(e).__name__)156 # set to default values157 set_defaultValues()158 GPIO.cleanup()...
harness.py
Source:harness.py
1from dbmanager import DBManager2def harnessapp(config):3 dbm = DBManager(config)4 dbm.readdb()5 data = dbm.getdata()6 cables = data['cable']7 output = []8 for cable in cables:9 output.append(cable['Parent'] + '/' + cable['Name'])10 # output.append('\tConnectors:')11 for conn in cable['Connectors']:12 output.append('-'*4 + conn['Name'] + ':' + conn['BlockType'])13 try:14 output.append('-'*8 + 'Family: ' + conn['Family'])15 except KeyError:16 output.append('-'*8 +'Part number: ' + conn['PartNumber'])17 for pin in range(1, int(conn['Pins'])+1):18 if pin < 10:19 pin_key = 'Pin0' + str(pin)20 else:21 pin_key = 'Pin' + str(pin)22 try:23 if conn[pin_key] != '':24 output.append('-'*8 + pin_key + ': ' + conn[pin_key])25 except KeyError:26 pass27 output.append('.')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!