Best Python code snippet using pytest-benchmark
fixture.py
Source:fixture.py
...127 raise128 def _raw(self, function_to_benchmark, *args, **kwargs):129 if self.enabled:130 runner = self._make_runner(function_to_benchmark, args, kwargs)131 duration, iterations, loops_range = self._calibrate_timer(runner)132 # Choose how many time we must repeat the test133 rounds = int(ceil(self._max_time / duration))134 rounds = max(rounds, self._min_rounds)135 rounds = min(rounds, sys.maxsize)136 stats = self._make_stats(iterations)137 self._logger.debug(" Running %s rounds x %s iterations ..." % (rounds, iterations), yellow=True, bold=True)138 run_start = time.time()139 if self._warmup:140 warmup_rounds = min(rounds, max(1, int(self._warmup / iterations)))141 self._logger.debug(" Warmup %s rounds x %s iterations ..." % (warmup_rounds, iterations))142 for _ in XRANGE(warmup_rounds):143 runner(loops_range)144 for _ in XRANGE(rounds):145 stats.update(runner(loops_range))146 self._logger.debug(" Ran for %ss." % format_time(time.time() - run_start), yellow=True, bold=True)147 if self.enabled and self.cprofile:148 profile = cProfile.Profile()149 function_result = profile.runcall(function_to_benchmark, *args, **kwargs)150 self.stats.cprofile_stats = pstats.Stats(profile)151 else:152 function_result = function_to_benchmark(*args, **kwargs)153 return function_result154 def _raw_pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):155 if kwargs is None:156 kwargs = {}157 has_args = bool(args or kwargs)158 if not isinstance(iterations, INT) or iterations < 1:159 raise ValueError("Must have positive int for `iterations`.")160 if not isinstance(rounds, INT) or rounds < 1:161 raise ValueError("Must have positive int for `rounds`.")162 if not isinstance(warmup_rounds, INT) or warmup_rounds < 0:163 raise ValueError("Must have positive int for `warmup_rounds`.")164 if iterations > 1 and setup:165 raise ValueError("Can't use more than 1 `iterations` with a `setup` function.")166 def make_arguments(args=args, kwargs=kwargs):167 if setup:168 maybe_args = setup()169 if maybe_args:170 if has_args:171 raise TypeError("Can't use `args` or `kwargs` if `setup` returns the arguments.")172 args, kwargs = maybe_args173 return args, kwargs174 if self.disabled:175 args, kwargs = make_arguments()176 return target(*args, **kwargs)177 stats = self._make_stats(iterations)178 loops_range = XRANGE(iterations) if iterations > 1 else None179 for _ in XRANGE(warmup_rounds):180 args, kwargs = make_arguments()181 runner = self._make_runner(target, args, kwargs)182 runner(loops_range)183 for _ in XRANGE(rounds):184 args, kwargs = make_arguments()185 runner = self._make_runner(target, args, kwargs)186 if loops_range:187 duration = runner(loops_range)188 else:189 duration, result = runner(loops_range)190 stats.update(duration)191 if loops_range:192 args, kwargs = make_arguments()193 result = target(*args, **kwargs)194 if self.cprofile:195 profile = cProfile.Profile()196 args, kwargs = make_arguments()197 profile.runcall(target, *args, **kwargs)198 self.stats.cprofile_stats = pstats.Stats(profile)199 return result200 def weave(self, target, **kwargs):201 try:202 import aspectlib203 except ImportError as exc:204 raise ImportError(exc.args, "Please install aspectlib or pytest-benchmark[aspect]")205 def aspect(function):206 def wrapper(*args, **kwargs):207 return self(function, *args, **kwargs)208 return wrapper209 self._cleanup_callbacks.append(aspectlib.weave(target, aspect, **kwargs).rollback)210 patch = weave211 def _cleanup(self):212 while self._cleanup_callbacks:213 callback = self._cleanup_callbacks.pop()214 callback()215 if not self._mode and not self.skipped:216 self._logger.warn("Benchmark fixture was not used at all in this test!",217 warner=self._warner, suspend=True)218 def _calibrate_timer(self, runner):219 timer_precision = self._get_precision(self._timer)220 min_time = max(self._min_time, timer_precision * self._calibration_precision)221 min_time_estimate = min_time * 5 / self._calibration_precision222 self._logger.debug("")223 self._logger.debug(" Calibrating to target round %ss; will estimate when reaching %ss "224 "(using: %s, precision: %ss)." % (225 format_time(min_time),226 format_time(min_time_estimate),227 NameWrapper(self._timer),228 format_time(timer_precision)229 ), yellow=True, bold=True)230 loops = 1231 while True:232 loops_range = XRANGE(loops)...
dronectrl.py
Source:dronectrl.py
1import socket2from time import time, sleep3from threading import Thread4# Define drone5class dm107s():6 # Default control value7 def __init__(self):8 # 4 values for flight9 self.roll = 12810 self.pitch = 12811 self.throttle = 12812 self.yaw = 12813 # 0 - normal mode, 2 - emergency stop, 4 - gyroscope calibration14 self.commands = 015 # Required for wifi control16 self.onoff = 117 # Prevent multiple takeoff button presses18 self._takeoff_flag = False19 # Prevent multiple calibrate button presses20 self._calibrate_flag = False21 # Connect to UDP port22 self.sess = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)23 #self.sess.connect(('192.168.100.1', 19798))24 # Initialize timer value25 self._takeoff_timer = 026 self._calibrate_timer = 027 # Flag to stop thread28 self._stopped = False29 30 # Start separated thread for drone control31 def start(self): 32 self._thread = Thread(target=self.send_ctrl, args=(), daemon=True)33 self._thread.start()34 return self35 36 # Get command hex for drone37 def get_hex(self):38 # XOR is for checksum39 self.command_out=((26122<<144)|self.roll<<136|self.pitch<<128|self.throttle<<120|self.yaw<<112|self.commands<<104|self.onoff*2<<96|65535<<80|(self.roll^self.pitch^self.throttle^self.yaw^self.commands^(self.onoff*2))<<8|153)40 self.command_out = hex(self.command_out)[2::]41 return self.command_out42 43 # Turn hex to byte package44 def _get_packet(self):45 self._hex_code = self.get_hex()46 self.package = bytes.fromhex(self._hex_code)47 return self.package48 49 # Send control to drone50 def send_ctrl(self):51 while not self._stopped:52 self._package = self._get_packet()53 #self.sess.send(self._package)54 self.sess.sendto(self._package, ('192.168.100.1', 19798))55 self.Flag_off()56 sleep(0.02)57 58 # Close connection to drone59 def close_connection(self):60 self._stopped = True61 if self._thread.daemon == False:62 self._thread.join()63 self.sess.close()64 65 # Return to default66 def default(self):67 self.roll = 12868 self.pitch = 12869 self.throttle = 12870 self.yaw = 12871 self.commands = 072 self.onoff = 173 self._takeoff_flag = False74 75 # Increment control76 def incremt(self, rl, pt, th, yw):77 self._value_to_change = [128, 128, 128, 128]78 self._change_val = [rl, pt, th, yw]79 for x in range(len(self._value_to_change)):80 self._value_to_change[x] += self._change_val[x]81 if self._value_to_change[x] <= 0:82 self._value_to_change[x] = 083 if self._value_to_change[x] >= 255:84 self._value_to_change[x] = 25585 [self.roll, self.pitch, self.throttle, self.yaw] = self._value_to_change86 87 # Roll right88 def roll_right(self):89 self.roll += 2090 if self.roll > 248:91 self.roll = 24892 93 # Pitch forward94 def pitch_fwd(self):95 self.pitch += 2096 if self.pitch > 248:97 self.pitch = 24898 99 # Increase throttle100 def throttle_up(self):101 self.throttle += 20102 if self.throttle > 248:103 self.throttle = 248104 105 # Yaw right106 def yaw_right(self):107 self.yaw -= 20108 if self.yaw < 18:109 self.yaw = 18110 111 # Roll left112 def roll_left(self):113 self.roll -= 20114 if self.roll < 18:115 self.roll = 18116 117 # Pitch backward118 def pitch_bwd(self):119 self.pitch -= 20120 if self.pitch < 18:121 self.pitch = 18122 123 # Decrease throttle124 def throttle_dwn(self):125 self.throttle -= 20126 if self.throttle < 18:127 self.throttle = 18128 129 # Yaw left130 def yaw_left(self):131 self.yaw += 20132 if self.yaw > 248:133 self.yaw = 248134 135 # Takeoff136 def takeoff(self):137 if self._takeoff_flag == False:138 self.commands = 1139 self._takeoff_flag = True140 self._takeoff_timer = time()141 142 # Landing143 def land(self):144 if self._takeoff_flag == False:145 self.commands = 1146 self._takeoff_flag = True147 self._takeoff_timer = time()148 149 # Flip takeoff flag150 def Flag_off(self):151 if (self._takeoff_flag == True and (time() - self._takeoff_timer >= 1)):152 self.commands = 0153 self._takeoff_flag = False154 if (self._calibrate_flag == True and (time() - self._calibrate_timer >= 3)):155 self.commands = 0156 self.onoff = 1157 self._calibrate_flag = False158 # Stop IMMEDIATELY159 def emergency_stop(self):160 self.roll = 128161 self.pitch = 128162 self.throttle = 128163 self.yaw = 128164 self.commands = 2165 self.onoff = 1166 self._takeoff_flag = False167 168 # Calibrate gyroscope169 def calib_gyro(self):170 if self._calibrate_flag == False:171 self.roll = 128172 self.pitch = 128173 self.throttle = 128174 self.yaw = 128175 self.commands = 4176 self.onoff = 0177 self._calibrate_flag = True178 self._calibrate_timer = time()179class naza():180 # Default control value181 def __init__(self, ip, port):182 # 4 values for flight183 self.roll = 8184 self.pitch = 8185 self.throttle = 8186 self.yaw = 8187 # Prevent multiple takeoff button presses188 self._takeoff_flag = False189 # Prevent multiple ignite button presses190 self._ignite_flag = False191 self._ignite_send = False192 # Connect to UDP port193 self.sess = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)194 self.ip = ip195 self.port = port196 #self.sess.connect((ip, port))197 # Initialize timer value198 self._ignite_timer = 0199 self._takeoff_timer = 0200 # Flag to stop thread201 self._stopped = False202 203 # Start separated thread for drone control204 def start(self): 205 self._thread = Thread(target=self.send_ctrl, args=(), daemon=True)206 self._thread.start()207 return self208 209 # Get command hex for drone210 def get_hex(self):211 # XOR is for checksum212 self.command_out=(self.throttle<<12|self.yaw<<8|self.pitch<<4|self.roll)213 self.command_out = hex(self.command_out)[2::]214 return self.command_out215 216 # Send control to drone217 def send_ctrl(self):218 while not self._stopped:219 if self._ignite_send == True:220 ignite_msg = 'st'221 self._package = ignite_msg.encode()222 else:223 self._package = self.get_hex().encode()224 #self.sess.send(self._package)225 self.sess.sendto(self._package, (self.ip, self.port))226 self.Flag_off()227 sleep(0.05)228 229 # Close connection to drone230 def close_connection(self):231 self._stopped = True232 if self._thread.daemon == False:233 self._thread.join()234 self.sess.close()235 236 # Return to default237 def default(self):238 self.roll = 8239 self.pitch = 8240 self.throttle = 8241 self.yaw = 8242 self._takeoff_flag = False243 self._ignite_flag = False244 245 # Increment control246 def incremt(self, rl, pt, th, yw):247 self._value_to_change = [8, 8, 8, 8]248 self._change_val = [rl, pt, th, yw]249 for x in range(len(self._value_to_change)):250 self._value_to_change[x] += self._change_val[x]251 if self._value_to_change[x] <= 0:252 self._value_to_change[x] = 0253 if self._value_to_change[x] >= 15:254 self._value_to_change[x] = 15255 [self.roll, self.pitch, self.throttle, self.yaw] = self._value_to_change256 257 # Roll right258 def roll_right(self):259 if self.roll < 15:260 self.roll += 1261 262 # Pitch forward263 def pitch_fwd(self):264 if self.pitch < 15:265 self.pitch += 1266 267 # Increase throttle268 def throttle_up(self):269 if self.throttle < 15:270 self.throttle += 1271 272 # Yaw right273 def yaw_right(self):274 if self.yaw < 15:275 self.yaw += 1276 277 # Roll left278 def roll_left(self):279 if self.roll > 0:280 self.roll -= 1281 282 # Pitch backward283 def pitch_bwd(self):284 if self.pitch > 0:285 self.pitch -= 1286 287 # Decrease throttle288 def throttle_dwn(self):289 if self.throttle > 0:290 self.throttle -= 1291 292 # Yaw left293 def yaw_left(self):294 if self.yaw > 0:295 self.yaw -= 1296 297 # Start engine298 def ignite(self):299 if self._ignite_flag == False:300 self._ignite_flag = True301 self._ignite_send = True302 self._ignite_timer = time()303 304 # Takeoff305 def takeoff(self):306 if self._takeoff_flag == False:307 self.throttle = 12308 self._takeoff_flag = True309 self._takeoff_timer = time()310 311 # Flip takeoff flag312 def Flag_off(self):313 if self._ignite_flag == True:314 if (time() - self._ignite_timer >= 1) and (time() - self._ignite_timer < 1.5):315 self._ignite_send = False316 self.roll = 8317 self.pitch = 8318 self.yaw = 8319 self.throttle = 0320 # Warming up engine321 if (time() - self._ignite_timer >= 1.5) and (time() - self._ignite_timer < 2):322 self.throttle = 2323 if (time() - self._ignite_timer >= 2) and (time() - self._ignite_timer < 2.5):324 self.throttle = 4325 if (time() - self._ignite_timer >= 2.5) and (time() - self._ignite_timer < 3):326 self.throttle = 6327 if (time() - self._ignite_timer >= 3) and (time() - self._ignite_timer < 4):328 self.throttle = 8329 # After starting engine, takeoff after 4s330 if (time() - self._ignite_timer >= 4):331 self._ignite_flag = False332 self.takeoff()333 if (self._takeoff_flag == True and (time() - self._takeoff_timer >= 4)):334 self.throttle = 8...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!