Best Python code snippet using hypothesis
engine.py
Source:engine.py
...83 # is at the beginning of a block of size 4 but only has 3 bytes left,84 # it's going to overrun the end of the buffer regardless of the85 # buffer contents.86 self.block_sizes = {}87 def __tree_is_exhausted(self):88 return 0 in self.dead89 def new_buffer(self):90 assert not self.__tree_is_exhausted()91 def draw_bytes(data, n, distribution):92 return self.__rewrite_for_novelty(93 data, self.__zero_bound(data, distribution(self.random, n))94 )95 self.last_data = ConjectureData(96 max_length=self.settings.buffer_size,97 draw_bytes=draw_bytes98 )99 self.test_function(self.last_data)100 self.last_data.freeze()101 def test_function(self, data):102 self.call_count += 1103 try:104 self._test_function(data)105 data.freeze()106 except StopTest as e:107 if e.testcounter != data.testcounter:108 self.save_buffer(data.buffer)109 raise e110 except:111 self.save_buffer(data.buffer)112 raise113 finally:114 data.freeze()115 self.note_details(data)116 self.debug_data(data)117 if data.status >= Status.VALID:118 self.valid_examples += 1119 tree_node = self.tree[0]120 indices = []121 node_index = 0122 for i, b in enumerate(data.buffer):123 indices.append(node_index)124 if i in data.forced_indices:125 self.forced[node_index] = b126 try:127 self.capped[node_index] = data.capped_indices[i]128 except KeyError:129 pass130 try:131 node_index = tree_node[b]132 except KeyError:133 node_index = len(self.tree)134 self.tree.append({})135 tree_node[b] = node_index136 tree_node = self.tree[node_index]137 if node_index in self.dead:138 break139 for u, v in data.blocks:140 # This can happen if we hit a dead node when walking the buffer.141 # In that case we alrady have this section of the tree mapped.142 if u >= len(indices):143 break144 self.block_sizes[indices[u]] = v - u145 if data.status != Status.OVERRUN and node_index not in self.dead:146 self.dead.add(node_index)147 self.tree[node_index] = data148 for j in reversed(indices):149 if (150 len(self.tree[j]) < self.capped.get(j, 255) + 1 and151 j not in self.forced152 ):153 break154 if set(self.tree[j].values()).issubset(self.dead):155 self.dead.add(j)156 else:157 break158 last_data_is_interesting = (159 self.last_data is not None and160 self.last_data.status == Status.INTERESTING161 )162 if (163 data.status == Status.INTERESTING and (164 not last_data_is_interesting or165 sort_key(data.buffer) < sort_key(self.last_data.buffer)166 )167 ):168 self.last_data = data169 if last_data_is_interesting:170 self.shrinks += 1171 if self.shrinks >= self.settings.max_shrinks:172 self.exit_reason = ExitReason.max_shrinks173 raise RunIsComplete()174 def consider_new_test_data(self, data):175 # Transition rules:176 # 1. Transition cannot decrease the status177 # 2. Any transition which increases the status is valid178 # 3. If the previous status was interesting, only shrinking179 # transitions are allowed.180 if data.buffer == self.last_data.buffer:181 return False182 if self.last_data.status < data.status:183 return True184 if self.last_data.status > data.status:185 return False186 if data.status == Status.INVALID:187 return data.index >= self.last_data.index188 if data.status == Status.OVERRUN:189 return data.overdraw <= self.last_data.overdraw190 assert data.status == Status.VALID191 return True192 def save_buffer(self, buffer):193 if (194 self.settings.database is not None and195 self.database_key is not None196 ):197 self.settings.database.save(self.database_key, hbytes(buffer))198 def note_details(self, data):199 if data.status == Status.INTERESTING:200 self.save_buffer(data.buffer)201 runtime = max(data.finish_time - data.start_time, 0.0)202 self.status_runtimes.setdefault(data.status, []).append(runtime)203 for event in set(map(self.event_to_string, data.events)):204 self.event_call_counts[event] += 1205 def debug(self, message):206 with self.settings:207 debug_report(message)208 def debug_data(self, data):209 buffer_parts = [u"["]210 for i, (u, v) in enumerate(data.blocks):211 if i > 0:212 buffer_parts.append(u" || ")213 buffer_parts.append(214 u', '.join(int_to_text(int(i)) for i in data.buffer[u:v]))215 buffer_parts.append(u']')216 self.debug(u'%d bytes %s -> %s, %s' % (217 data.index,218 u''.join(buffer_parts),219 unicode_safe_repr(data.status),220 data.output,221 ))222 def prescreen_buffer(self, buffer):223 """Attempt to rule out buffer as a possible interesting candidate.224 Returns False if we know for sure that running this buffer will not225 produce an interesting result. Returns True if it might (because it226 explores territory we have not previously tried).227 This is purely an optimisation to try to reduce the number of tests we228 run. "return True" would be a valid but inefficient implementation.229 """230 node_index = 0231 n = len(buffer)232 for k, b in enumerate(buffer):233 if node_index in self.dead:234 return False235 try:236 # The block size at that point provides a lower bound on how237 # many more bytes are required. If the buffer does not have238 # enough bytes to fulfill that block size then we can rule out239 # this buffer.240 if k + self.block_sizes[node_index] > n:241 return False242 except KeyError:243 pass244 try:245 b = self.forced[node_index]246 except KeyError:247 pass248 try:249 b = min(b, self.capped[node_index])250 except KeyError:251 pass252 try:253 node_index = self.tree[node_index][b]254 except KeyError:255 return True256 else:257 return False258 def incorporate_new_buffer(self, buffer):259 assert self.last_data.status == Status.INTERESTING260 if (261 self.settings.timeout > 0 and262 time.time() >= self.start_time + self.settings.timeout263 ):264 self.exit_reason = ExitReason.timeout265 raise RunIsComplete()266 buffer = hbytes(buffer[:self.last_data.index])267 assert sort_key(buffer) < sort_key(self.last_data.buffer)268 if not self.prescreen_buffer(buffer):269 return False270 assert sort_key(buffer) <= sort_key(self.last_data.buffer)271 data = ConjectureData.for_buffer(buffer)272 self.test_function(data)273 return data is self.last_data274 def run(self):275 with self.settings:276 try:277 self._run()278 except RunIsComplete:279 pass280 self.debug(281 u'Run complete after %d examples (%d valid) and %d shrinks' % (282 self.call_count, self.valid_examples, self.shrinks,283 ))284 def _new_mutator(self):285 def draw_new(data, n, distribution):286 return distribution(self.random, n)287 def draw_existing(data, n, distribution):288 return self.last_data.buffer[data.index:data.index + n]289 def draw_smaller(data, n, distribution):290 existing = self.last_data.buffer[data.index:data.index + n]291 r = distribution(self.random, n)292 if r <= existing:293 return r294 return _draw_predecessor(self.random, existing)295 def draw_larger(data, n, distribution):296 existing = self.last_data.buffer[data.index:data.index + n]297 r = distribution(self.random, n)298 if r >= existing:299 return r300 return _draw_successor(self.random, existing)301 def reuse_existing(data, n, distribution):302 choices = data.block_starts.get(n, []) or \303 self.last_data.block_starts.get(n, [])304 if choices:305 i = self.random.choice(choices)306 return self.last_data.buffer[i:i + n]307 else:308 result = distribution(self.random, n)309 assert isinstance(result, hbytes)310 return result311 def flip_bit(data, n, distribution):312 buf = bytearray(313 self.last_data.buffer[data.index:data.index + n])314 i = self.random.randint(0, n - 1)315 k = self.random.randint(0, 7)316 buf[i] ^= (1 << k)317 return hbytes(buf)318 def draw_zero(data, n, distribution):319 return hbytes(b'\0' * n)320 def draw_max(data, n, distribution):321 return hbytes([255]) * n322 def draw_constant(data, n, distribution):323 return bytes_from_list([324 self.random.randint(0, 255)325 ] * n)326 options = [327 draw_new,328 reuse_existing, reuse_existing,329 draw_existing, draw_smaller, draw_larger,330 flip_bit,331 draw_zero, draw_max, draw_zero, draw_max,332 draw_constant,333 ]334 bits = [335 self.random.choice(options) for _ in hrange(3)336 ]337 def draw_mutated(data, n, distribution):338 if (339 data.index + n > len(self.last_data.buffer)340 ):341 result = distribution(self.random, n)342 else:343 result = self.random.choice(bits)(data, n, distribution)344 return self.__rewrite_for_novelty(345 data, self.__zero_bound(data, result))346 return draw_mutated347 def __rewrite(self, data, result):348 return self.__rewrite_for_novelty(349 data, self.__zero_bound(data, result)350 )351 def __zero_bound(self, data, result):352 """This tries to get the size of the generated data under control by353 replacing the result with zero if we are too deep or have already354 generated too much data.355 This causes us to enter "shrinking mode" there and thus reduce356 the size of the generated data.357 """358 if (359 data.depth * 2 >= MAX_DEPTH or360 (data.index + len(result)) * 2 >= self.settings.buffer_size361 ):362 if any(result):363 data.hit_zero_bound = True364 return hbytes(len(result))365 else:366 return result367 def __rewrite_for_novelty(self, data, result):368 """Take a block that is about to be added to data as the result of a369 draw_bytes call and rewrite it a small amount to ensure that the result370 will be novel: that is, not hit a part of the tree that we have fully371 explored.372 This is mostly useful for test functions which draw a small373 number of blocks.374 """375 assert isinstance(result, hbytes)376 try:377 node_index = data.__current_node_index378 except AttributeError:379 node_index = 0380 data.__current_node_index = node_index381 data.__hit_novelty = False382 data.__evaluated_to = 0383 if data.__hit_novelty:384 return result385 node = self.tree[node_index]386 for i in hrange(data.__evaluated_to, len(data.buffer)):387 node = self.tree[node_index]388 try:389 node_index = node[data.buffer[i]]390 assert node_index not in self.dead391 node = self.tree[node_index]392 except KeyError:393 data.__hit_novelty = True394 return result395 for i, b in enumerate(result):396 assert isinstance(b, int)397 try:398 new_node_index = node[b]399 except KeyError:400 data.__hit_novelty = True401 return result402 new_node = self.tree[new_node_index]403 if new_node_index in self.dead:404 if isinstance(result, hbytes):405 result = bytearray(result)406 for c in range(256):407 if c not in node:408 assert c <= self.capped.get(node_index, c)409 result[i] = c410 data.__hit_novelty = True411 return hbytes(result)412 else:413 new_node_index = node[c]414 new_node = self.tree[new_node_index]415 if new_node_index not in self.dead:416 result[i] = c417 break418 else: # pragma: no cover419 assert False, (420 'Found a tree node which is live despite all its '421 'children being dead.')422 node_index = new_node_index423 node = new_node424 assert node_index not in self.dead425 data.__current_node_index = node_index426 data.__evaluated_to = data.index + len(result)427 return hbytes(result)428 def has_existing_examples(self):429 return (430 self.settings.database is not None and431 self.database_key is not None and432 Phase.reuse in self.settings.phases433 )434 def reuse_existing_examples(self):435 """If appropriate (we have a database and have been told to use it),436 try to reload existing examples from the database.437 If there are a lot we don't try all of them. We always try the438 smallest example in the database (which is guaranteed to be the439 last failure) and the largest (which is usually the seed example440 which the last failure came from but we don't enforce that). We441 then take a random sampling of the remainder and try those. Any442 examples that are no longer interesting are cleared out.443 """444 if self.has_existing_examples():445 corpus = sorted(446 self.settings.database.fetch(self.database_key),447 key=sort_key448 )449 desired_size = max(2, ceil(0.1 * self.settings.max_examples))450 if desired_size < len(corpus):451 new_corpus = [corpus[0], corpus[-1]]452 n_boost = max(desired_size - 2, 0)453 new_corpus.extend(self.random.sample(corpus[1:-1], n_boost))454 corpus = new_corpus455 corpus.sort(key=sort_key)456 for existing in corpus:457 if self.valid_examples >= self.settings.max_examples:458 self.exit_with(ExitReason.max_examples)459 if self.call_count >= max(460 self.settings.max_iterations, self.settings.max_examples461 ):462 self.exit_with(ExitReason.max_iterations)463 data = ConjectureData.for_buffer(existing)464 self.test_function(data)465 data.freeze()466 self.last_data = data467 self.consider_new_test_data(data)468 if data.status == Status.INTERESTING:469 assert data.status == Status.INTERESTING470 self.last_data = data471 break472 else:473 self.settings.database.delete(474 self.database_key, existing)475 def exit_with(self, reason):476 self.exit_reason = reason477 raise RunIsComplete()478 def _run(self):479 self.last_data = None480 mutations = 0481 start_time = time.time()482 self.reuse_existing_examples()483 if (484 Phase.generate in self.settings.phases and not485 self.__tree_is_exhausted()486 ):487 if (488 self.last_data is None or489 self.last_data.status < Status.INTERESTING490 ):491 self.new_buffer()492 mutator = self._new_mutator()493 zero_bound_queue = []494 while (495 self.last_data.status != Status.INTERESTING and496 not self.__tree_is_exhausted()497 ):498 if self.valid_examples >= self.settings.max_examples:499 self.exit_reason = ExitReason.max_examples500 return501 if self.call_count >= max(502 self.settings.max_iterations, self.settings.max_examples503 ):504 self.exit_reason = ExitReason.max_iterations505 return506 if (507 self.settings.timeout > 0 and508 time.time() >= start_time + self.settings.timeout509 ):510 self.exit_reason = ExitReason.timeout511 return512 if zero_bound_queue:513 # Whenever we generated an example and it hits a bound514 # which forces zero blocks into it, this creates a weird515 # distortion effect by making certain parts of the data516 # stream (especially ones to the right) much more likely517 # to be zero. We fix this by redistributing the generated518 # data by shuffling it randomly. This results in the519 # zero data being spread evenly throughout the buffer.520 # Hopefully the shrinking this causes will cause us to521 # naturally fail to hit the bound.522 # If it doesn't then we will queue the new version up again523 # (now with more zeros) and try again.524 overdrawn = zero_bound_queue.pop()525 buffer = bytearray(overdrawn.buffer)526 # These will have values written to them that are different527 # from what's in them anyway, so the value there doesn't528 # really "count" for distributional purposes, and if we529 # leave them in then they can cause the fraction of non530 # zero bytes to increase on redraw instead of decrease.531 for i in overdrawn.forced_indices:532 buffer[i] = 0533 self.random.shuffle(buffer)534 buffer = hbytes(buffer)535 if buffer == overdrawn.buffer:536 continue537 def draw_bytes(data, n, distribution):538 result = buffer[data.index:data.index + n]539 if len(result) < n:540 result += hbytes(n - len(result))541 return self.__rewrite(data, result)542 data = ConjectureData(543 draw_bytes=draw_bytes,544 max_length=self.settings.buffer_size,545 )546 self.test_function(data)547 data.freeze()548 elif mutations >= self.settings.max_mutations:549 mutations = 0550 data = self.new_buffer()551 mutator = self._new_mutator()552 else:553 data = ConjectureData(554 draw_bytes=mutator,555 max_length=self.settings.buffer_size556 )557 self.test_function(data)558 data.freeze()559 prev_data = self.last_data560 if self.consider_new_test_data(data):561 self.last_data = data562 if data.status > prev_data.status:563 mutations = 0564 else:565 mutator = self._new_mutator()566 if getattr(data, 'hit_zero_bound', False):567 zero_bound_queue.append(data)568 mutations += 1569 if self.__tree_is_exhausted():570 self.exit_reason = ExitReason.finished571 return572 data = self.last_data573 if data is None:574 self.exit_reason = ExitReason.finished575 return576 assert isinstance(data.output, text_type)577 if self.settings.max_shrinks <= 0:578 self.exit_reason = ExitReason.max_shrinks579 return580 if Phase.shrink not in self.settings.phases:581 self.exit_reason = ExitReason.finished582 return583 data = ConjectureData.for_buffer(self.last_data.buffer)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!