Best Python code snippet using hypothesis
shrinker.py
Source:shrinker.py
...265 def calls(self):266 """Return the number of calls that have been made to the underlying267 test function."""268 return self.engine.call_count269 def consider_new_buffer(self, buffer):270 """Returns True if after running this buffer the result would be271 the current shrink_target."""272 buffer = bytes(buffer)273 return buffer.startswith(self.buffer) or self.incorporate_new_buffer(buffer)274 def incorporate_new_buffer(self, buffer):275 """Either runs the test function on this buffer and returns True if276 that changed the shrink_target, or determines that doing so would277 be useless and returns False without running it."""278 buffer = bytes(buffer[: self.shrink_target.index])279 # Sometimes an attempt at lexicographic minimization will do the wrong280 # thing because the buffer has changed under it (e.g. something has281 # turned into a write, the bit size has changed). The result would be282 # an invalid string, but it's better for us to just ignore it here as283 # it turns out to involve quite a lot of tricky book-keeping to get284 # this right and it's better to just handle it in one place.285 if sort_key(buffer) >= sort_key(self.shrink_target.buffer):286 return False287 if self.shrink_target.buffer.startswith(buffer):288 return False289 previous = self.shrink_target290 self.cached_test_function(buffer)291 return previous is not self.shrink_target292 def incorporate_test_data(self, data):293 """Takes a ConjectureData or Overrun object updates the current294 shrink_target if this data represents an improvement over it,295 returning True if it is."""296 if data is Overrun or data is self.shrink_target:297 return298 if self.__predicate(data) and sort_key(data.buffer) < sort_key(299 self.shrink_target.buffer300 ):301 self.update_shrink_target(data)302 return True303 return False304 def cached_test_function(self, buffer):305 """Returns a cached version of the underlying test function, so306 that the result is either an Overrun object (if the buffer is307 too short to be a valid test case) or a ConjectureData object308 with status >= INVALID that would result from running this buffer."""309 if self.__pending_shrink_explanation is not None:310 self.debug(self.__pending_shrink_explanation)311 self.__pending_shrink_explanation = None312 buffer = bytes(buffer)313 result = self.engine.cached_test_function(buffer)314 self.incorporate_test_data(result)315 return result316 def debug(self, msg):317 self.engine.debug(msg)318 @property319 def random(self):320 return self.engine.random321 def shrink(self):322 """Run the full set of shrinks and update shrink_target.323 This method is "mostly idempotent" - calling it twice is unlikely to324 have any effect, though it has a non-zero probability of doing so.325 """326 # We assume that if an all-zero block of bytes is an interesting327 # example then we're not going to do better than that.328 # This might not technically be true: e.g. for integers() | booleans()329 # the simplest example is actually [1, 0]. Missing this case is fairly330 # harmless and this allows us to make various simplifying assumptions331 # about the structure of the data (principally that we're never332 # operating on a block of all zero bytes so can use non-zeroness as a333 # signpost of complexity).334 if not any(self.shrink_target.buffer) or self.incorporate_new_buffer(335 bytes(len(self.shrink_target.buffer))336 ):337 return338 try:339 self.greedy_shrink()340 finally:341 if self.engine.report_debug_info:342 def s(n):343 return "s" if n != 1 else ""344 total_deleted = self.initial_size - len(self.shrink_target.buffer)345 self.debug("---------------------")346 self.debug("Shrink pass profiling")347 self.debug("---------------------")348 self.debug("")349 calls = self.engine.call_count - self.initial_calls350 self.debug(351 (352 "Shrinking made a total of %d call%s "353 "of which %d shrank. This deleted %d byte%s out of %d."354 )355 % (356 calls,357 s(calls),358 self.shrinks,359 total_deleted,360 s(total_deleted),361 self.initial_size,362 )363 )364 for useful in [True, False]:365 self.debug("")366 if useful:367 self.debug("Useful passes:")368 else:369 self.debug("Useless passes:")370 self.debug("")371 for p in sorted(372 self.passes, key=lambda t: (-t.calls, t.deletions, t.shrinks)373 ):374 if p.calls == 0:375 continue376 if (p.shrinks != 0) != useful:377 continue378 self.debug(379 (380 " * %s made %d call%s of which "381 "%d shrank, deleting %d byte%s."382 )383 % (384 p.name,385 p.calls,386 s(p.calls),387 p.shrinks,388 p.deletions,389 s(p.deletions),390 )391 )392 self.debug("")393 def greedy_shrink(self):394 """Run a full set of greedy shrinks (that is, ones that will only ever395 move to a better target) and update shrink_target appropriately.396 This method iterates to a fixed point and so is idempontent - calling397 it twice will have exactly the same effect as calling it once.398 """399 self.fixate_shrink_passes(400 [401 block_program("X" * 5),402 block_program("X" * 4),403 block_program("X" * 3),404 block_program("X" * 2),405 block_program("X" * 1),406 "pass_to_descendant",407 "adaptive_example_deletion",408 "alphabet_minimize",409 "zero_examples",410 "reorder_examples",411 "minimize_floats",412 "minimize_duplicated_blocks",413 block_program("-XX"),414 "minimize_individual_blocks",415 block_program("--X"),416 ]417 )418 @derived_value419 def shrink_pass_choice_trees(self):420 return defaultdict(ChoiceTree)421 def fixate_shrink_passes(self, passes):422 """Run steps from each pass in ``passes`` until the current shrink target423 is a fixed point of all of them."""424 passes = list(map(self.shrink_pass, passes))425 any_ran = True426 while any_ran:427 any_ran = False428 # We run remove_discarded after every step to do cleanup429 # keeping track of whether that actually works. Either there is430 # no discarded data and it is basically free, or it reliably works431 # and deletes data, or it doesn't work. In that latter case we turn432 # it off for the rest of this loop through the passes, but will433 # try again once all of the passes have been run.434 can_discard = self.remove_discarded()435 successful_passes = set()436 for sp in passes:437 # We run each pass until it has failed a certain number438 # of times, where a "failure" is any step where it made439 # at least one call and did not result in a shrink.440 # This gives passes which work reasonably often more of441 # chance to run.442 failures = 0443 successes = 0444 # The choice of 3 is fairly arbitrary and was hand tuned445 # to some particular examples. It is very unlikely that446 # is the best choice in general, but it's not an447 # unreasonable choice: Making it smaller than this would448 # give too high a chance of an otherwise very worthwhile449 # pass getting screened out too early if it got unlucky,450 # and making it much larger than this would result in us451 # spending too much time on bad passes.452 max_failures = 3453 while failures < max_failures:454 prev_calls = self.calls455 prev = self.shrink_target456 if sp.step():457 any_ran = True458 else:459 break460 if prev_calls != self.calls:461 if can_discard:462 can_discard = self.remove_discarded()463 if prev is self.shrink_target:464 failures += 1465 else:466 successes += 1467 if successes > 0:468 successful_passes.add(sp)469 # If only some of our shrink passes are doing anything useful470 # then run all of those to a fixed point before running the471 # full set. This is particularly important when an emergency472 # shrink pass unlocks some non-emergency ones and it suddenly473 # becomes very expensive to find a bunch of small changes.474 if 0 < len(successful_passes) < len(passes):475 self.fixate_shrink_passes(successful_passes)476 for sp in passes:477 sp.fixed_point_at = self.shrink_target478 @property479 def buffer(self):480 return self.shrink_target.buffer481 @property482 def blocks(self):483 return self.shrink_target.blocks484 @property485 def examples(self):486 return self.shrink_target.examples487 def all_block_bounds(self):488 return self.shrink_target.blocks.all_bounds()489 @derived_value490 def examples_by_label(self):491 """An index of all examples grouped by their label, with492 the examples stored in their normal index order."""493 examples_by_label = defaultdict(list)494 for ex in self.examples:495 examples_by_label[ex.label].append(ex)496 return dict(examples_by_label)497 @derived_value498 def distinct_labels(self):499 return sorted(self.examples_by_label, key=str)500 @defines_shrink_pass()501 def pass_to_descendant(self, chooser):502 """Attempt to replace each example with a descendant example.503 This is designed to deal with strategies that call themselves504 recursively. For example, suppose we had:505 binary_tree = st.deferred(506 lambda: st.one_of(507 st.integers(), st.tuples(binary_tree, binary_tree)))508 This pass guarantees that we can replace any binary tree with one of509 its subtrees - each of those will create an interval that the parent510 could validly be replaced with, and this pass will try doing that.511 This is pretty expensive - it takes O(len(intervals)^2) - so we run it512 late in the process when we've got the number of intervals as far down513 as possible.514 """515 label = chooser.choose(516 self.distinct_labels, lambda l: len(self.examples_by_label[l]) >= 2517 )518 ls = self.examples_by_label[label]519 i = chooser.choose(range(len(ls) - 1))520 ancestor = ls[i]521 if i + 1 == len(ls) or ls[i + 1].start >= ancestor.end:522 return523 @self.cached(label, i)524 def descendants():525 lo = i + 1526 hi = len(ls)527 while lo + 1 < hi:528 mid = (lo + hi) // 2529 if ls[mid].start >= ancestor.end:530 hi = mid531 else:532 lo = mid533 return [t for t in ls[i + 1 : hi] if t.length < ancestor.length]534 descendant = chooser.choose(descendants, lambda ex: ex.length > 0)535 assert ancestor.start <= descendant.start536 assert ancestor.end >= descendant.end537 assert descendant.length < ancestor.length538 self.incorporate_new_buffer(539 self.buffer[: ancestor.start]540 + self.buffer[descendant.start : descendant.end]541 + self.buffer[ancestor.end :]542 )543 def lower_common_block_offset(self):544 """Sometimes we find ourselves in a situation where changes to one part545 of the byte stream unlock changes to other parts. Sometimes this is546 good, but sometimes this can cause us to exhibit exponential slow547 downs!548 e.g. suppose we had the following:549 m = draw(integers(min_value=0))550 n = draw(integers(min_value=0))551 assert abs(m - n) > 1552 If this fails then we'll end up with a loop where on each iteration we553 reduce each of m and n by 2 - m can't go lower because of n, then n554 can't go lower because of m.555 This will take us O(m) iterations to complete, which is exponential in556 the data size, as we gradually zig zag our way towards zero.557 This can only happen if we're failing to reduce the size of the byte558 stream: The number of iterations that reduce the length of the byte559 stream is bounded by that length.560 So what we do is this: We keep track of which blocks are changing, and561 then if there's some non-zero common offset to them we try and minimize562 them all at once by lowering that offset.563 This may not work, and it definitely won't get us out of all possible564 exponential slow downs (an example of where it doesn't is where the565 shape of the blocks changes as a result of this bouncing behaviour),566 but it fails fast when it doesn't work and gets us out of a really567 nastily slow case when it does.568 """569 if len(self.__changed_blocks) <= 1:570 return571 current = self.shrink_target572 blocked = [current.buffer[u:v] for u, v in self.all_block_bounds()]573 changed = [574 i575 for i in sorted(self.__changed_blocks)576 if not self.shrink_target.blocks[i].trivial577 ]578 if not changed:579 return580 ints = [int_from_bytes(blocked[i]) for i in changed]581 offset = min(ints)582 assert offset > 0583 for i in range(len(ints)):584 ints[i] -= offset585 def reoffset(o):586 new_blocks = list(blocked)587 for i, v in zip(changed, ints):588 new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))589 return self.incorporate_new_buffer(b"".join(new_blocks))590 Integer.shrink(offset, reoffset, random=self.random)591 self.clear_change_tracking()592 def clear_change_tracking(self):593 self.__last_checked_changed_at = self.shrink_target594 self.__all_changed_blocks = set()595 def mark_changed(self, i):596 self.__changed_blocks.add(i)597 @property598 def __changed_blocks(self):599 if self.__last_checked_changed_at is not self.shrink_target:600 prev_target = self.__last_checked_changed_at601 new_target = self.shrink_target602 assert prev_target is not new_target603 prev = prev_target.buffer604 new = new_target.buffer605 assert sort_key(new) < sort_key(prev)606 if (607 len(new_target.blocks) != len(prev_target.blocks)608 or new_target.blocks.endpoints != prev_target.blocks.endpoints609 ):610 self.__all_changed_blocks = set()611 else:612 blocks = new_target.blocks613 # Index of last block whose contents have been modified, found614 # by checking if the tail past this point has been modified.615 last_changed = binary_search(616 0,617 len(blocks),618 lambda i: prev[blocks.start(i) :] != new[blocks.start(i) :],619 )620 # Index of the first block whose contents have been changed,621 # because we know that this predicate is true for zero (because622 # the prefix from the start is empty), so the result must be True623 # for the bytes from the start of this block and False for the624 # bytes from the end, hence the change is in this block.625 first_changed = binary_search(626 0,627 len(blocks),628 lambda i: prev[: blocks.start(i)] == new[: blocks.start(i)],629 )630 # Between these two changed regions we now do a linear scan to631 # check if any specific block values have changed.632 for i in range(first_changed, last_changed + 1):633 u, v = blocks.bounds(i)634 if i not in self.__all_changed_blocks and prev[u:v] != new[u:v]:635 self.__all_changed_blocks.add(i)636 self.__last_checked_changed_at = new_target637 assert self.__last_checked_changed_at is self.shrink_target638 return self.__all_changed_blocks639 def update_shrink_target(self, new_target):640 assert isinstance(new_target, ConjectureResult)641 if self.shrink_target is not None:642 self.shrinks += 1643 else:644 self.__all_changed_blocks = set()645 self.__last_checked_changed_at = new_target646 self.shrink_target = new_target647 self.__derived_values = {}648 def try_shrinking_blocks(self, blocks, b):649 """Attempts to replace each block in the blocks list with b. Returns650 True if it succeeded (which may include some additional modifications651 to shrink_target).652 In current usage it is expected that each of the blocks currently have653 the same value, although this is not essential. Note that b must be654 < the block at min(blocks) or this is not a valid shrink.655 This method will attempt to do some small amount of work to delete data656 that occurs after the end of the blocks. This is useful for cases where657 there is some size dependency on the value of a block.658 """659 initial_attempt = bytearray(self.shrink_target.buffer)660 for i, block in enumerate(blocks):661 if block >= len(self.blocks):662 blocks = blocks[:i]663 break664 u, v = self.blocks[block].bounds665 n = min(self.blocks[block].length, len(b))666 initial_attempt[v - n : v] = b[-n:]667 if not blocks:668 return False669 start = self.shrink_target.blocks[blocks[0]].start670 end = self.shrink_target.blocks[blocks[-1]].end671 initial_data = self.cached_test_function(initial_attempt)672 if initial_data is self.shrink_target:673 self.lower_common_block_offset()674 return True675 # If this produced something completely invalid we ditch it676 # here rather than trying to persevere.677 if initial_data.status < Status.VALID:678 return False679 # We've shrunk inside our group of blocks, so we have no way to680 # continue. (This only happens when shrinking more than one block at681 # a time).682 if len(initial_data.buffer) < v:683 return False684 lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)685 # If this did not in fact cause the data size to shrink we686 # bail here because it's not worth trying to delete stuff from687 # the remainder.688 if lost_data <= 0:689 return False690 # We now look for contiguous regions to delete that might help fix up691 # this failed shrink. We only look for contiguous regions of the right692 # lengths because doing anything more than that starts to get very693 # expensive. See minimize_individual_blocks for where we694 # try to be more aggressive.695 regions_to_delete = {(end, end + lost_data)}696 for j in (blocks[-1] + 1, blocks[-1] + 2):697 if j >= min(len(initial_data.blocks), len(self.blocks)):698 continue699 # We look for a block very shortly after the last one that has700 # lost some of its size, and try to delete from the beginning so701 # that it retains the same integer value. This is a bit of a hyper702 # specific trick designed to make our integers() strategy shrink703 # well.704 r1, s1 = self.shrink_target.blocks[j].bounds705 r2, s2 = initial_data.blocks[j].bounds706 lost = (s1 - r1) - (s2 - r2)707 # Apparently a coverage bug? An assert False in the body of this708 # will reliably fail, but it shows up as uncovered.709 if lost <= 0 or r1 != r2: # pragma: no cover710 continue711 regions_to_delete.add((r1, r1 + lost))712 for ex in self.shrink_target.examples:713 if ex.start > start:714 continue715 if ex.end <= end:716 continue717 replacement = initial_data.examples[ex.index]718 in_original = [c for c in ex.children if c.start >= end]719 in_replaced = [c for c in replacement.children if c.start >= end]720 if len(in_replaced) >= len(in_original) or not in_replaced:721 continue722 # We've found an example where some of the children went missing723 # as a result of this change, and just replacing it with the data724 # it would have had and removing the spillover didn't work. This725 # means that some of its children towards the right must be726 # important, so we try to arrange it so that it retains its727 # rightmost children instead of its leftmost.728 regions_to_delete.add(729 (in_original[0].start, in_original[-len(in_replaced)].start)730 )731 for u, v in sorted(regions_to_delete, key=lambda x: x[1] - x[0], reverse=True):732 try_with_deleted = bytearray(initial_attempt)733 del try_with_deleted[u:v]734 if self.incorporate_new_buffer(try_with_deleted):735 return True736 return False737 def remove_discarded(self):738 """Try removing all bytes marked as discarded.739 This is primarily to deal with data that has been ignored while740 doing rejection sampling - e.g. as a result of an integer range, or a741 filtered strategy.742 Such data will also be handled by the adaptive_example_deletion pass,743 but that pass is necessarily more conservative and will try deleting744 each interval individually. The common case is that all data drawn and745 rejected can just be thrown away immediately in one block, so this pass746 will be much faster than trying each one individually when it works.747 returns False if there is discarded data and removing it does not work,748 otherwise returns True.749 """750 while self.shrink_target.has_discards:751 discarded = []752 for ex in self.shrink_target.examples:753 if (754 ex.length > 0755 and ex.discarded756 and (not discarded or ex.start >= discarded[-1][-1])757 ):758 discarded.append((ex.start, ex.end))759 # This can happen if we have discards but they are all of760 # zero length. This shouldn't happen very often so it's761 # faster to check for it here than at the point of example762 # generation.763 if not discarded:764 break765 attempt = bytearray(self.shrink_target.buffer)766 for u, v in reversed(discarded):767 del attempt[u:v]768 if not self.incorporate_new_buffer(attempt):769 return False770 return True771 @defines_shrink_pass()772 def adaptive_example_deletion(self, chooser):773 """Attempts to delete every example from the test case.774 That is, it is logically equivalent to trying ``self.buffer[:ex.start] +775 self.buffer[ex.end:]`` for every example ``ex``. The order in which776 examples are tried is randomized, and when deletion is successful it777 will attempt to adapt to delete more than one example at a time.778 """779 example = chooser.choose(self.examples)780 if not self.incorporate_new_buffer(781 self.buffer[: example.start] + self.buffer[example.end :]782 ):783 return784 # If we successfully deleted one example there may be a useful785 # deletable region around here.786 original = self.shrink_target787 endpoints = set()788 for ex in original.examples:789 if ex.depth <= example.depth:790 endpoints.add(ex.start)791 endpoints.add(ex.end)792 partition = sorted(endpoints)793 j = partition.index(example.start)794 def delete_region(a, b):795 assert a <= j <= b796 if a < 0 or b >= len(partition) - 1:797 return False798 return self.consider_new_buffer(799 original.buffer[: partition[a]] + original.buffer[partition[b] :]800 )801 to_right = find_integer(lambda n: delete_region(j, j + n))802 find_integer(lambda n: delete_region(j - n, j + to_right))803 def try_zero_example(self, ex):804 u = ex.start805 v = ex.end806 attempt = self.cached_test_function(807 self.buffer[:u] + bytes(v - u) + self.buffer[v:]808 )809 if attempt is Overrun:810 return False811 in_replacement = attempt.examples[ex.index]812 used = in_replacement.length813 if attempt is not self.shrink_target:814 if in_replacement.end < len(attempt.buffer) and used < ex.length:815 self.incorporate_new_buffer(816 self.buffer[:u] + bytes(used) + self.buffer[v:]817 )818 return self.examples[ex.index].trivial819 @defines_shrink_pass()820 def zero_examples(self, chooser):821 """Attempt to replace each example with a minimal version of itself."""822 ex = chooser.choose(self.examples, lambda ex: not ex.trivial)823 # If the example is already trivial, assume there's nothing to do here.824 # We could attempt to use it as an adaptive replacement for other825 # similar examples, but that seems to be ineffective, resulting mostly826 # in redundant work rather than helping.827 if not self.try_zero_example(ex):828 return829 # If we zeroed the example we need to get the new one that replaced it.830 ex = self.examples[ex.index]831 original = self.shrink_target832 group = self.examples_by_label[ex.label]833 i = group.index(ex)834 replacement = self.buffer[ex.start : ex.end]835 # We first expand to cover the trivial region surrounding this group.836 # This avoids a situation where the adaptive phase "succeeds" a lot by837 # virtue of not doing anything and then goes into a galloping phase838 # where it does a bunch of useless work.839 def all_trivial(a, b):840 if a < 0 or b > len(group):841 return False842 return all(e.trivial for e in group[a:b])843 start, end = expand_region(all_trivial, i, i + 1)844 # If we've got multiple trivial examples of different lengths then845 # this isn't going to work as a replacement for all of them and so we846 # skip out early.847 if any(e.length != len(replacement) for e in group[start:end]):848 return849 def can_zero(a, b):850 if a < 0 or b > len(group):851 return False852 regions = []853 for e in group[a:b]:854 t = (e.start, e.end, replacement)855 if not regions or t[0] >= regions[-1][1]:856 regions.append(t)857 return self.consider_new_buffer(replace_all(original.buffer, regions))858 expand_region(can_zero, start, end)859 @derived_value860 def blocks_by_non_zero_suffix(self):861 """Returns a list of blocks grouped by their non-zero suffix,862 as a list of (suffix, indices) pairs, skipping all groupings863 where there is only one index.864 This is only used for the arguments of minimize_duplicated_blocks.865 """866 duplicates = defaultdict(list)867 for block in self.blocks:868 duplicates[non_zero_suffix(self.buffer[block.start : block.end])].append(869 block.index870 )871 return duplicates872 @derived_value873 def duplicated_block_suffixes(self):874 return sorted(self.blocks_by_non_zero_suffix)875 @defines_shrink_pass()876 def minimize_duplicated_blocks(self, chooser):877 """Find blocks that have been duplicated in multiple places and attempt878 to minimize all of the duplicates simultaneously.879 This lets us handle cases where two values can't be shrunk880 independently of each other but can easily be shrunk together.881 For example if we had something like:882 ls = data.draw(lists(integers()))883 y = data.draw(integers())884 assert y not in ls885 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were886 to replace both 3s with 0, this would be a valid shrink, but if we were887 to replace either 3 with 0 on its own the test would start passing.888 It is also useful for when that duplication is accidental and the value889 of the blocks doesn't matter very much because it allows us to replace890 more values at once.891 """892 block = chooser.choose(self.duplicated_block_suffixes)893 targets = self.blocks_by_non_zero_suffix[block]894 if len(targets) <= 1:895 return896 Lexical.shrink(897 block,898 lambda b: self.try_shrinking_blocks(targets, b),899 random=self.random,900 full=False,901 )902 @defines_shrink_pass()903 def minimize_floats(self, chooser):904 """Some shrinks that we employ that only really make sense for our905 specific floating point encoding that are hard to discover from any906 sort of reasonable general principle. This allows us to make907 transformations like replacing a NaN with an Infinity or replacing908 a float with its nearest integers that we would otherwise not be909 able to due to them requiring very specific transformations of910 the bit sequence.911 We only apply these transformations to blocks that "look like" our912 standard float encodings because they are only really meaningful913 there. The logic for detecting this is reasonably precise, but914 it doesn't matter if it's wrong. These are always valid915 transformations to make, they just don't necessarily correspond to916 anything particularly meaningful for non-float values.917 """918 ex = chooser.choose(919 self.examples,920 lambda ex: (921 ex.label == DRAW_FLOAT_LABEL922 and len(ex.children) == 2923 and ex.children[0].length == 8924 ),925 )926 u = ex.children[0].start927 v = ex.children[0].end928 buf = self.shrink_target.buffer929 b = buf[u:v]930 f = lex_to_float(int_from_bytes(b))931 b2 = int_to_bytes(float_to_lex(f), 8)932 if b == b2 or self.consider_new_buffer(buf[:u] + b2 + buf[v:]):933 Float.shrink(934 f,935 lambda x: self.consider_new_buffer(936 self.shrink_target.buffer[:u]937 + int_to_bytes(float_to_lex(x), 8)938 + self.shrink_target.buffer[v:]939 ),940 random=self.random,941 )942 @defines_shrink_pass()943 def minimize_individual_blocks(self, chooser):944 """Attempt to minimize each block in sequence.945 This is the pass that ensures that e.g. each integer we draw is a946 minimum value. So it's the part that guarantees that if we e.g. do947 x = data.draw(integers())948 assert x < 10949 then in our shrunk example, x = 10 rather than say 97.950 If we are unsuccessful at minimizing a block of interest we then951 check if that's because it's changing the size of the test case and,952 if so, we also make an attempt to delete parts of the test case to953 see if that fixes it.954 We handle most of the common cases in try_shrinking_blocks which is955 pretty good at clearing out large contiguous blocks of dead space,956 but it fails when there is data that has to stay in particular places957 in the list.958 """959 block = chooser.choose(self.blocks, lambda b: not b.trivial)960 initial = self.shrink_target961 u, v = block.bounds962 i = block.index963 Lexical.shrink(964 self.shrink_target.buffer[u:v],965 lambda b: self.try_shrinking_blocks((i,), b),966 random=self.random,967 full=False,968 )969 if self.shrink_target is not initial:970 return971 lowered = (972 self.buffer[: block.start]973 + int_to_bytes(974 int_from_bytes(self.buffer[block.start : block.end]) - 1, block.length975 )976 + self.buffer[block.end :]977 )978 attempt = self.cached_test_function(lowered)979 if (980 attempt.status < Status.VALID981 or len(attempt.buffer) == len(self.buffer)982 or len(attempt.buffer) == block.end983 ):984 return985 # If it were then the lexical shrink should have worked and we could986 # never have got here.987 assert attempt is not self.shrink_target988 @self.cached(block.index)989 def first_example_after_block():990 lo = 0991 hi = len(self.examples)992 while lo + 1 < hi:993 mid = (lo + hi) // 2994 ex = self.examples[mid]995 if ex.start >= block.end:996 hi = mid997 else:998 lo = mid999 return hi1000 ex = self.examples[1001 chooser.choose(1002 range(first_example_after_block, len(self.examples)),1003 lambda i: self.examples[i].length > 0,1004 )1005 ]1006 u, v = block.bounds1007 buf = bytearray(lowered)1008 del buf[ex.start : ex.end]1009 self.incorporate_new_buffer(buf)1010 @defines_shrink_pass()1011 def reorder_examples(self, chooser):1012 """This pass allows us to reorder the children of each example.1013 For example, consider the following:1014 .. code-block:: python1015 import hypothesis.strategies as st1016 from hypothesis import given1017 @given(st.text(), st.text())1018 def test_not_equal(x, y):1019 assert x != y1020 Without the ability to reorder x and y this could fail either with1021 ``x=""``, ``y="0"``, or the other way around. With reordering it will1022 reliably fail with ``x=""``, ``y="0"``.1023 """1024 ex = chooser.choose(self.examples)1025 label = chooser.choose(ex.children).label1026 group = [c for c in ex.children if c.label == label]1027 if len(group) <= 1:1028 return1029 st = self.shrink_target1030 pieces = [st.buffer[ex.start : ex.end] for ex in group]1031 endpoints = [(ex.start, ex.end) for ex in group]1032 Ordering.shrink(1033 pieces,1034 lambda ls: self.consider_new_buffer(1035 replace_all(st.buffer, [(u, v, r) for (u, v), r in zip(endpoints, ls)])1036 ),1037 random=self.random,1038 )1039 @derived_value1040 def alphabet(self):1041 return sorted(set(self.buffer))1042 @defines_shrink_pass()1043 def alphabet_minimize(self, chooser):1044 """Attempts to minimize the "alphabet" - the set of bytes that1045 are used in the representation of the current buffer. The main1046 benefit of this is that it significantly increases our cache hit rate1047 by making things that are equivalent more likely to have the same1048 representation, but it's also generally a rather effective "fuzzing"1049 step that gives us a lot of good opportunities to slip to a smaller1050 representation of the same bug.1051 """1052 c = chooser.choose(self.alphabet)1053 buf = self.buffer1054 def can_replace_with(d):1055 if d < 0:1056 return False1057 if self.consider_new_buffer(bytes([d if b == c else b for b in buf])):1058 if d <= 1:1059 # For small values of d if this succeeds we take this1060 # as evidence that it is worth doing a a bulk replacement1061 # where we replace all values which are close1062 # to c but smaller with d as well. This helps us substantially1063 # in cases where we have a lot of "dead" bytes that don't really do1064 # much, as it allows us to replace many of them in one go rather1065 # than one at a time. An example of where this matters is1066 # test_minimize_multiple_elements_in_silly_large_int_range_min_is_not_dupe1067 # in test_shrink_quality.py1068 def replace_range(k):1069 if k > c:1070 return False1071 def should_replace_byte(b):1072 return c - k <= b <= c and d < b1073 return self.consider_new_buffer(1074 bytes([d if should_replace_byte(b) else b for b in buf])1075 )1076 find_integer(replace_range)1077 return True1078 if (1079 # If we cannot replace the current byte with its predecessor,1080 # assume it is already minimal and continue on. This ensures1081 # we make no more than one call per distinct byte value in the1082 # event that no shrinks are possible here.1083 not can_replace_with(c - 1)1084 # We next try replacing with 0 or 1. If this works then1085 # there is nothing else to do here.1086 or can_replace_with(0)1087 or can_replace_with(1)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!