Best Python code snippet using pytest-benchmark
usb_phy.py
Source:usb_phy.py
...29 self._expect_loopback = expect_loopback30 self._dut_exit_time = dut_exit_time31 def get_name(self):32 return self._name33 def get_clock(self):34 return self._clock35 def start_test(self):36 self.wait_until(self.xsi.get_time() + self._initial_delay)37 self.wait(lambda x: self._clock.is_high())38 self.wait(lambda x: self._clock.is_low())39 def end_test(self):40 if self._verbose:41 print "All packets sent"42 if self._complete_fn:43 self._complete_fn(self)44 # Give the DUT a reasonable time to process the packet45 self.wait_until(self.xsi.get_time() + self.END_OF_TEST_TIME)46 if self._do_timeout:47 # Allow time for a maximum sized packet to arrive48 timeout_time = (self._clock.get_bit_time() * 1522 * 8)49 if self._expect_loopback:50 # If looping back then take into account all the data51 total_packet_bytes = sum([len(packet.get_bytes()) for packet in self._packets])52 total_data_bits = total_packet_bytes * 853 # Allow 2 cycles per bit54 timeout_time += 2 * total_data_bits55 # The clock ticks are 2ns long56 timeout_time *= 257 # The packets are copied to and from the user application58 timeout_time *= 259 self.wait_until(self.xsi.get_time() + timeout_time)60 if self._test_ctrl:61 # Indicate to the DUT that the test has finished62 self.xsi.drive_port_pins(self._test_ctrl, 1)63 # Allow time for the DUT to exit64 self.wait_until(self.xsi.get_time() + self._dut_exit_time)65 print "ERROR: Test timed out"66 self.xsi.terminate()67 def set_clock(self, clock):68 self._clock = clock69 def set_packets(self, packets):70 self._packets = packets71 def drive_error(self, value):72 self.xsi.drive_port_pins(self._rxer, value)73class UsbPhy(TxPhy):74 def __init__(self, rxd, rxa, rxdv, rxer, vld, txd, txv, txrdy, clock,75 initial_delay=85000, verbose=False, test_ctrl=None,76 do_timeout=True, complete_fn=None, expect_loopback=True,77 dut_exit_time=25000):78 super(UsbPhy, self).__init__('mii', rxd, rxa, rxdv, rxer, vld, txd, txv, txrdy, clock,79 initial_delay, verbose, test_ctrl,80 do_timeout, complete_fn, expect_loopback,81 dut_exit_time)82 def run(self):83 xsi = self.xsi84 self.start_test()85 for i,packet in enumerate(self._packets):86 #error_nibbles = packet.get_error_nibbles()87 88 if isinstance(packet, RxPacket):89 90 timeout = packet.get_timeout()91 92 #print "Expecting pkt. Timeout in: {i}".format(i=timeout)93 in_rx_packet = False94 rx_packet = []95 while timeout != 0:96 self.wait(lambda x: self._clock.is_high())97 self.wait(lambda x: self._clock.is_low())98 timeout = timeout - 199 #print "{i}".format(i=timeout)100 #sample TXV for new packet101 if xsi.sample_port_pins(self._txv) == 1:102 print "Receiving packet {}".format(i)103 in_rx_packet = True104 break105 106 if in_rx_packet == False:107 print "ERROR: Timed out waiting for packet"108 else:109 #print "in packet"110 while in_rx_packet == True:111 112 # TODO txrdy pulsing113 xsi.drive_port_pins(self._txrdy, 1)114 data = xsi.sample_port_pins(self._txd)115 116 print "Received byte: {0:#x}".format(data)117 rx_packet.append(data)118 self.wait(lambda x: self._clock.is_high())119 self.wait(lambda x: self._clock.is_low())120 if xsi.sample_port_pins(self._txv) == 0:121 #print "TXV low, breaking out of loop"122 in_rx_packet = False123 124 125 # End of packet126 xsi.drive_port_pins(self._txrdy, 0)127 # Check packet agaist expected128 expected = packet.get_bytes()129 if len(expected) != len(rx_packet):130 print "ERROR: Rx packet length bad. Expecting: {} actual: {}".format(len(expected), len(rx_packet))131 132 # Check packet data against expected133 if cmp(expected, rx_packet):134 print "ERROR: Rx Packet Error. Expected:"135 for item in expected:136 print "{0:#x}".format(item)137 print "Received:" 138 for item in rx_packet:139 print "{0:#x}".format(item)140 else:141 142 # xCore should not be trying to send if we are trying to send..143 if xsi.sample_port_pins(self._txv) == 1:144 print "ERROR: Unexpected packet from xCORE"145 rxv_count = packet.get_data_valid_count();146 #print "Waiting for inter_pkt_gap: {i}".format(i=packet.inter_frame_gap)147 self.wait_until(xsi.get_time() + packet.inter_pkt_gap)148 print "Sending packet {}".format(i)149 if self._verbose:150 sys.stdout.write(packet.dump())151 # Set RXA high152 xsi.drive_port_pins(self._rxa, 1)153 # Wait for RXA rise delay TODO, this should be configurable 154 self.wait(lambda x: self._clock.is_high())155 self.wait(lambda x: self._clock.is_low())156 #if isinstance(packet, TokenPacket):157 # print "Token packet, clear valid token"158 xsi.drive_port_pins(self._vld, 0)159 for (i, byte) in enumerate(packet.get_bytes()):160 # xCore should not be trying to send if we are trying to send..161 if xsi.sample_port_pins(self._txv) == 1:162 print "ERROR: Unexpected packet from xCORE"163 self.wait(lambda x: self._clock.is_low())164 self.wait(lambda x: self._clock.is_high())165 self.wait(lambda x: self._clock.is_low())166 xsi.drive_port_pins(self._rxdv, 1)167 xsi.drive_port_pins(self._rxd, byte)168 169 if (packet.rxe_assert_time != 0) and (packet.rxe_assert_time == i):170 xsi.drive_port_pins(self._rxer, 1)171 while rxv_count != 0:172 self.wait(lambda x: self._clock.is_high())173 self.wait(lambda x: self._clock.is_low())174 xsi.drive_port_pins(self._rxdv, 0)175 rxv_count = rxv_count - 1176 # xCore should not be trying to send if we are trying to send..177 if xsi.sample_port_pins(self._txv) == 1:178 print "ERROR: Unexpected packet from xCORE"179 #print "Sending byte {0:#x}".format(byte)180 rxv_count = packet.get_data_valid_count();181 if isinstance(packet, TokenPacket):182 #print "Token packet, driving valid"183 if packet.get_token_valid():184 xsi.drive_port_pins(self._vld, 1)185 else:186 xsi.drive_port_pins(self._vld, 0)187 # Wait for last byte188 self.wait(lambda x: self._clock.is_high())189 self.wait(lambda x: self._clock.is_low())190 xsi.drive_port_pins(self._rxdv, 0)191 xsi.drive_port_pins(self._rxer, 0)192 rxa_end_delay = packet.rxa_end_delay193 while rxa_end_delay != 0:194 # Wait for RXA fall delay TODO, this should be configurable 195 self.wait(lambda x: self._clock.is_high())196 self.wait(lambda x: self._clock.is_low())197 rxa_end_delay = rxa_end_delay - 1198 199 # xCore should not be trying to send if we are trying to send..200 if xsi.sample_port_pins(self._txv) == 1:201 print "ERROR: Unexpected packet from xCORE"202 xsi.drive_port_pins(self._rxa, 0)203 #if self._verbose:204 #print "Sent"205 print "Test done"206 self.end_test()207class RxPhy(xmostest.SimThread):208 def __init__(self, name, txd, txen, clock, print_packets, packet_fn, verbose, test_ctrl):209 self._name = name210 self._txd = txd211 self._txen = txen212 self._clock = clock213 self._print_packets = print_packets214 self._verbose = verbose215 self._test_ctrl = test_ctrl216 self._packet_fn = packet_fn217 self.expected_packets = None218 self.expect_packet_index = 0219 self.num_expected_packets = 0220 self.expected_packets = None221 self.expect_packet_index = 0222 self.num_expected_packets = 0223 def get_name(self):224 return self._name225 def get_clock(self):226 return self._clock227 def set_expected_packets(self, packets):228 self.expect_packet_index = 0;229 self.expected_packets = packets230 if self.expected_packets is None:231 self.num_expected_packets = 0232 else:233 self.num_expected_packets = len(self.expected_packets)234class MiiReceiver(RxPhy):235 def __init__(self, txd, txen, clock, print_packets=False,236 packet_fn=None, verbose=False, test_ctrl=None):237 super(MiiReceiver, self).__init__('mii', txd, txen, clock, print_packets,238 packet_fn, verbose, test_ctrl)239 def run(self):...
simtime.py
Source:simtime.py
1''' Simulation clock with guaranteed decimal precision. '''2from collections import OrderedDict3from inspect import signature4from types import SimpleNamespace5from decimal import Decimal6from bluesky import settings7# Register settings defaults8settings.set_variable_defaults(simdt=0.05)9MAX_RECOVERY_FAC = 410# Data that the simulation clock needs to keep11_clock = SimpleNamespace(t=Decimal('0.0'), dt=Decimal(repr(settings.simdt)),12 ft=0.0, fdt=settings.simdt)13_timers = OrderedDict()14def setdt(newdt=None, target='simdt'):15 ''' Set the timestep for the simulation clock.16 Returns a floating-point representation of the new timestep. '''17 if newdt is None:18 text = 'Simulation timesteps:\nbase dt = {}'.format(_clock.fdt)19 for timer in _timers.values():20 text += '\n{} = {}'.format(timer.name, timer.dt_act)21 return True, text22 if target == 'simdt':23 _clock.dt = Decimal(repr(newdt))24 _clock.fdt = float(_clock.dt)25 msg = 'Base dt set to {}'.format(_clock.dt)26 for timer in _timers.values():27 _, tmsg = timer.setdt()28 msg = msg + '\n' + tmsg29 return True, msg30 timer = _timers.get(target, None)31 if timer is None:32 return False, 'Timer {} not found'.format(target)33 34 return timer.setdt(newdt)35def step(recovery_time=0):36 ''' Increment the time of this clock with one timestep, plus a possible37 recovery time increment if the simulation is lagging and real-time38 running is enabled.39 Returns a floating-point representation of the new simulation time,40 and the actual timestep. '''41 recovery_time = min(Decimal(recovery_time), MAX_RECOVERY_FAC * _clock.dt)42 _clock.t += _clock.dt + recovery_time43 _clock.ft = float(_clock.t)44 for timer in _timers.values():45 timer.step()46 return _clock.ft, _clock.fdt + float(recovery_time)47def reset():48 ''' Reset the simulation clock. '''49 _clock.t = Decimal('0.0')50 _clock.dt = Decimal(repr(settings.simdt))51 _clock.ft = 0.052 _clock.fdt = float(_clock.dt)53 for timer in _timers.values():54 timer.reset()55class Timer:56 def __init__(self, name, dt):57 self.name = name58 self.dt_default = Decimal(repr(dt))59 self.dt_requested = self.dt_default60 self.dt_act = self.dt_default61 self.rel_freq = 062 self.counter = 063 self.tprev = _clock.t64 self.setdt()65 66 # Add self to dictionary of timers67 _timers[name.upper()] = self68 def reset(self):69 self.dt_requested = self.dt_default70 self.dt_act = self.dt_default71 self.rel_freq = 072 self.counter = 073 self.tprev = _clock.t74 self.setdt()75 def setdt(self, dt=None):76 # setdt is called without arguments if the base dt has changed77 # In this case, check if our dt is still ok.78 if dt:79 # Store the requested dt separately: is used to update actual dt80 # for when the simulation dt is changed81 self.dt_requested = Decimal(repr(dt))82 # Calculate the relative frequency of the simulation with respect to this timer83 rel_freq = max(1, int(self.dt_requested // _clock.dt))84 # Update timer to next trigger point85 passed = self.rel_freq - self.counter86 self.counter = max(0, rel_freq - passed)87 self.rel_freq = rel_freq88 dtnew = self.rel_freq * _clock.dt89 if abs(self.dt_act - dtnew) < 0.0001:90 return True, self.name + ' dt is unchanged.'91 self.dt_act = dtnew92 if abs(self.dt_act - self.dt_requested) > 0.0001:93 return True, self.name + \94 ' dt set to {} to match integer multiple of base dt.'.format(self.dt_act)95 else:96 return True, self.name + ' dt set to {}'.format(self.dt_act) 97 def step(self):98 self.counter = (self.counter or self.rel_freq) - 199 def readynext(self):100 return self.counter == 0101 def elapsed(self):102 elapsed = float(_clock.t - self.tprev)103 self.tprev = _clock.t104 return elapsed105def timed_function(name, dt=1.0):106 def decorator(fun):107 timer = Timer(name, dt)108 if 'dt' in signature(fun).parameters:109 def wrapper(*args, **kwargs):110 if timer.readynext():111 return fun(*args, **kwargs, dt=float(timer.dt_act))112 else:113 def wrapper(*args, **kwargs):114 if timer.readynext():115 return fun(*args, **kwargs)116 wrapper.__istimed = True117 return wrapper...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!