Best Python code snippet using hypothesis
engine.py
Source:engine.py
...213 # This part of the tree has already been marked as dead, so214 # there's no need to traverse any deeper.215 break216 # At each node that begins a block, record the size of that block.217 for u, v in data.all_block_bounds():218 # This can happen if we hit a dead node when walking the buffer.219 # In that case we already have this section of the tree mapped.220 if u >= len(indices):221 break222 self.block_sizes[indices[u]] = v - u223 # Forcibly mark all nodes beyond the zero-bound point as dead,224 # because we don't intend to try any other values there.225 self.dead.update(indices[self.cap:])226 # Now store this result in the tree (if appropriate), and check if227 # any nodes need to be marked as dead.228 if data.status != Status.OVERRUN and node_index not in self.dead:229 # Mark this node as dead, because it produced a result.230 # Trying to explore suffixes of it would not be helpful.231 self.dead.add(node_index)232 # Store the result in the tree as a leaf. This will overwrite the233 # branch node that was created during traversal.234 self.tree[node_index] = data235 # Review the traversed nodes, to see if any should be marked236 # as dead. We check them in reverse order, because as soon as we237 # find a live node, all nodes before it must still be live too.238 for j in reversed(indices):239 mask = self.masks.get(j, 0xff)240 assert _is_simple_mask(mask)241 max_size = mask + 1242 if (243 len(self.tree[j]) < max_size and244 j not in self.forced245 ):246 # There are still byte values to explore at this node,247 # so it isn't dead yet.248 break249 if set(self.tree[j].values()).issubset(self.dead):250 # Everything beyond this node is known to be dead,251 # and there are no more values to explore here (see above),252 # so this node must be dead too.253 self.dead.add(j)254 else:255 # Even though all of this node's possible values have been256 # tried, there are still some deeper nodes that remain257 # alive, so this node isn't dead yet.258 break259 if data.status == Status.INTERESTING:260 key = data.interesting_origin261 changed = False262 try:263 existing = self.interesting_examples[key]264 except KeyError:265 changed = True266 else:267 if sort_key(data.buffer) < sort_key(existing.buffer):268 self.shrinks += 1269 self.downgrade_buffer(existing.buffer)270 changed = True271 if changed:272 self.save_buffer(data.buffer)273 self.interesting_examples[key] = data274 self.shrunk_examples.discard(key)275 if self.shrinks >= MAX_SHRINKS:276 self.exit_with(ExitReason.max_shrinks)277 if (278 self.settings.timeout > 0 and279 benchmark_time() >= self.start_time + self.settings.timeout280 ):281 note_deprecation((282 'Your tests are hitting the settings timeout (%.2fs). '283 'This functionality will go away in a future release '284 'and you should not rely on it. Instead, try setting '285 'max_examples to be some value lower than %d (the number '286 'of examples your test successfully ran here). Or, if you '287 'would prefer your tests to run to completion, regardless '288 'of how long they take, you can set the timeout value to '289 'hypothesis.unlimited.'290 ) % (291 self.settings.timeout, self.valid_examples),292 self.settings)293 self.exit_with(ExitReason.timeout)294 if not self.interesting_examples:295 if self.valid_examples >= self.settings.max_examples:296 self.exit_with(ExitReason.max_examples)297 if self.call_count >= max(298 self.settings.max_examples * 10,299 # We have a high-ish default max iterations, so that tests300 # don't become flaky when max_examples is too low.301 1000302 ):303 self.exit_with(ExitReason.max_iterations)304 if self.__tree_is_exhausted():305 self.exit_with(ExitReason.finished)306 self.record_for_health_check(data)307 def generate_novel_prefix(self):308 """Uses the tree to proactively generate a starting sequence of bytes309 that we haven't explored yet for this test.310 When this method is called, we assume that there must be at311 least one novel prefix left to find. If there were not, then the312 test run should have already stopped due to tree exhaustion.313 """314 prefix = bytearray()315 node = 0316 while True:317 assert len(prefix) < self.cap318 assert node not in self.dead319 # Figure out the range of byte values we should be trying.320 # Normally this will be 0-255, unless the current position has a321 # mask.322 mask = self.masks.get(node, 0xff)323 assert _is_simple_mask(mask)324 upper_bound = mask + 1325 try:326 c = self.forced[node]327 # This position has a forced byte value, so trying a different328 # value wouldn't be helpful. Just add the forced byte, and329 # move on to the next position.330 prefix.append(c)331 node = self.tree[node][c]332 continue333 except KeyError:334 pass335 # Provisionally choose the next byte value.336 # This will change later if we find that it was a bad choice.337 c = self.random.randrange(0, upper_bound)338 try:339 next_node = self.tree[node][c]340 if next_node in self.dead:341 # Whoops, the byte value we chose for this position has342 # already been fully explored. Let's pick a new value, and343 # this time choose a value that's definitely still alive.344 choices = [345 b for b in hrange(upper_bound)346 if self.tree[node].get(b) not in self.dead347 ]348 assert choices349 c = self.random.choice(choices)350 node = self.tree[node][c]351 else:352 # The byte value we chose is in the tree, but it still has353 # some unexplored descendants, so it's a valid choice.354 node = next_node355 prefix.append(c)356 except KeyError:357 # The byte value we chose isn't in the tree at this position,358 # which means we've successfully found a novel prefix.359 prefix.append(c)360 break361 assert node not in self.dead362 return hbytes(prefix)363 @property364 def cap(self):365 return self.settings.buffer_size // 2366 def record_for_health_check(self, data):367 # Once we've actually found a bug, there's no point in trying to run368 # health checks - they'll just mask the actually important information.369 if data.status == Status.INTERESTING:370 self.health_check_state = None371 state = self.health_check_state372 if state is None:373 return374 state.draw_times.extend(data.draw_times)375 if data.status == Status.VALID:376 state.valid_examples += 1377 elif data.status == Status.INVALID:378 state.invalid_examples += 1379 else:380 assert data.status == Status.OVERRUN381 state.overrun_examples += 1382 max_valid_draws = 10383 max_invalid_draws = 50384 max_overrun_draws = 20385 assert state.valid_examples <= max_valid_draws386 if state.valid_examples == max_valid_draws:387 self.health_check_state = None388 return389 if state.overrun_examples == max_overrun_draws:390 fail_health_check(self.settings, (391 'Examples routinely exceeded the max allowable size. '392 '(%d examples overran while generating %d valid ones)'393 '. Generating examples this large will usually lead to'394 ' bad results. You could try setting max_size parameters '395 'on your collections and turning '396 'max_leaves down on recursive() calls.') % (397 state.overrun_examples, state.valid_examples398 ), HealthCheck.data_too_large)399 if state.invalid_examples == max_invalid_draws:400 fail_health_check(self.settings, (401 'It looks like your strategy is filtering out a lot '402 'of data. Health check found %d filtered examples but '403 'only %d good ones. This will make your tests much '404 'slower, and also will probably distort the data '405 'generation quite a lot. You should adapt your '406 'strategy to filter less. This can also be caused by '407 'a low max_leaves parameter in recursive() calls') % (408 state.invalid_examples, state.valid_examples409 ), HealthCheck.filter_too_much)410 draw_time = sum(state.draw_times)411 if draw_time > 1.0:412 fail_health_check(self.settings, (413 'Data generation is extremely slow: Only produced '414 '%d valid examples in %.2f seconds (%d invalid ones '415 'and %d exceeded maximum size). Try decreasing '416 "size of the data you're generating (with e.g."417 'max_size or max_leaves parameters).'418 ) % (419 state.valid_examples, draw_time, state.invalid_examples,420 state.overrun_examples), HealthCheck.too_slow,)421 def save_buffer(self, buffer):422 if self.settings.database is not None:423 key = self.database_key424 if key is None:425 return426 self.settings.database.save(key, hbytes(buffer))427 def downgrade_buffer(self, buffer):428 if (429 self.settings.database is not None and430 self.database_key is not None431 ):432 self.settings.database.move(433 self.database_key, self.secondary_key, buffer)434 @property435 def secondary_key(self):436 return b'.'.join((self.database_key, b'secondary'))437 @property438 def covering_key(self):439 return b'.'.join((self.database_key, b'coverage'))440 def note_details(self, data):441 runtime = max(data.finish_time - data.start_time, 0.0)442 self.all_runtimes.append(runtime)443 self.all_drawtimes.extend(data.draw_times)444 self.status_runtimes.setdefault(data.status, []).append(runtime)445 for event in set(map(self.event_to_string, data.events)):446 self.event_call_counts[event] += 1447 def debug(self, message):448 with local_settings(self.settings):449 debug_report(message)450 @property451 def report_debug_info(self):452 return self.settings.verbosity >= Verbosity.debug453 def debug_data(self, data):454 if not self.report_debug_info:455 return456 stack = [[]]457 def go(ex):458 if ex.length == 0:459 return460 if len(ex.children) == 0:461 stack[-1].append(int_from_bytes(462 data.buffer[ex.start:ex.end]463 ))464 else:465 node = []466 stack.append(node)467 for v in ex.children:468 go(v)469 stack.pop()470 if len(node) == 1:471 stack[-1].extend(node)472 else:473 stack[-1].append(node)474 go(data.examples[0])475 assert len(stack) == 1476 status = repr(data.status)477 if data.status == Status.INTERESTING:478 status = '%s (%r)' % (status, data.interesting_origin,)479 self.debug('%d bytes %r -> %s, %s' % (480 data.index, stack[0], status, data.output,481 ))482 def run(self):483 with local_settings(self.settings):484 try:485 self._run()486 except RunIsComplete:487 pass488 for v in self.interesting_examples.values():489 self.debug_data(v)490 self.debug(491 u'Run complete after %d examples (%d valid) and %d shrinks'492 % (self.call_count, self.valid_examples, self.shrinks))493 def _new_mutator(self):494 target_data = [None]495 def draw_new(data, n):496 return uniform(self.random, n)497 def draw_existing(data, n):498 return target_data[0].buffer[data.index:data.index + n]499 def draw_smaller(data, n):500 existing = target_data[0].buffer[data.index:data.index + n]501 r = uniform(self.random, n)502 if r <= existing:503 return r504 return _draw_predecessor(self.random, existing)505 def draw_larger(data, n):506 existing = target_data[0].buffer[data.index:data.index + n]507 r = uniform(self.random, n)508 if r >= existing:509 return r510 return _draw_successor(self.random, existing)511 def reuse_existing(data, n):512 choices = data.block_starts.get(n, [])513 if choices:514 i = self.random.choice(choices)515 return hbytes(data.buffer[i:i + n])516 else:517 result = uniform(self.random, n)518 assert isinstance(result, hbytes)519 return result520 def flip_bit(data, n):521 buf = bytearray(522 target_data[0].buffer[data.index:data.index + n])523 i = self.random.randint(0, n - 1)524 k = self.random.randint(0, 7)525 buf[i] ^= (1 << k)526 return hbytes(buf)527 def draw_zero(data, n):528 return hbytes(b'\0' * n)529 def draw_max(data, n):530 return hbytes([255]) * n531 def draw_constant(data, n):532 return hbytes([self.random.randint(0, 255)]) * n533 def redraw_last(data, n):534 u = target_data[0].blocks[-1].start535 if data.index + n <= u:536 return target_data[0].buffer[data.index:data.index + n]537 else:538 return uniform(self.random, n)539 options = [540 draw_new,541 redraw_last, redraw_last,542 reuse_existing, reuse_existing,543 draw_existing, draw_smaller, draw_larger,544 flip_bit,545 draw_zero, draw_max, draw_zero, draw_max,546 draw_constant,547 ]548 bits = [549 self.random.choice(options) for _ in hrange(3)550 ]551 prefix = [None]552 def mutate_from(origin):553 target_data[0] = origin554 prefix[0] = self.generate_novel_prefix()555 return draw_mutated556 def draw_mutated(data, n):557 if data.index + n > len(target_data[0].buffer):558 result = uniform(self.random, n)559 else:560 result = self.random.choice(bits)(data, n)561 p = prefix[0]562 if data.index < len(p):563 start = p[data.index:data.index + n]564 result = start + result[len(start):]565 return self.__zero_bound(data, result)566 return mutate_from567 def __rewrite(self, data, result):568 return self.__zero_bound(data, result)569 def __zero_bound(self, data, result):570 """This tries to get the size of the generated data under control by571 replacing the result with zero if we are too deep or have already572 generated too much data.573 This causes us to enter "shrinking mode" there and thus reduce574 the size of the generated data.575 """576 initial = len(result)577 if data.depth * 2 >= MAX_DEPTH or data.index >= self.cap:578 data.forced_indices.update(579 hrange(data.index, data.index + initial))580 data.hit_zero_bound = True581 result = hbytes(initial)582 elif data.index + initial >= self.cap:583 data.hit_zero_bound = True584 n = self.cap - data.index585 data.forced_indices.update(586 hrange(self.cap, data.index + initial))587 result = result[:n] + hbytes(initial - n)588 assert len(result) == initial589 return result590 @property591 def database(self):592 if self.database_key is None:593 return None594 return self.settings.database595 def has_existing_examples(self):596 return (597 self.database is not None and598 Phase.reuse in self.settings.phases599 )600 def reuse_existing_examples(self):601 """If appropriate (we have a database and have been told to use it),602 try to reload existing examples from the database.603 If there are a lot we don't try all of them. We always try the604 smallest example in the database (which is guaranteed to be the605 last failure) and the largest (which is usually the seed example606 which the last failure came from but we don't enforce that). We607 then take a random sampling of the remainder and try those. Any608 examples that are no longer interesting are cleared out.609 """610 if self.has_existing_examples():611 self.debug('Reusing examples from database')612 # We have to do some careful juggling here. We have two database613 # corpora: The primary and secondary. The primary corpus is a614 # small set of minimized examples each of which has at one point615 # demonstrated a distinct bug. We want to retry all of these.616 # We also have a secondary corpus of examples that have at some617 # point demonstrated interestingness (currently only ones that618 # were previously non-minimal examples of a bug, but this will619 # likely expand in future). These are a good source of potentially620 # interesting examples, but there are a lot of them, so we down621 # sample the secondary corpus to a more manageable size.622 corpus = sorted(623 self.settings.database.fetch(self.database_key),624 key=sort_key625 )626 desired_size = max(2, ceil(0.1 * self.settings.max_examples))627 for extra_key in [self.secondary_key, self.covering_key]:628 if len(corpus) < desired_size:629 extra_corpus = list(630 self.settings.database.fetch(extra_key),631 )632 shortfall = desired_size - len(corpus)633 if len(extra_corpus) <= shortfall:634 extra = extra_corpus635 else:636 extra = self.random.sample(extra_corpus, shortfall)637 extra.sort(key=sort_key)638 corpus.extend(extra)639 self.used_examples_from_database = len(corpus) > 0640 for existing in corpus:641 last_data = ConjectureData.for_buffer(existing)642 try:643 self.test_function(last_data)644 finally:645 if last_data.status != Status.INTERESTING:646 self.settings.database.delete(647 self.database_key, existing)648 self.settings.database.delete(649 self.secondary_key, existing)650 def exit_with(self, reason):651 self.exit_reason = reason652 raise RunIsComplete()653 def generate_new_examples(self):654 if Phase.generate not in self.settings.phases:655 return656 zero_data = self.cached_test_function(657 hbytes(self.settings.buffer_size))658 if zero_data.status == Status.OVERRUN or (659 zero_data.status == Status.VALID and660 len(zero_data.buffer) * 2 > self.settings.buffer_size661 ):662 fail_health_check(663 self.settings,664 'The smallest natural example for your test is extremely '665 'large. This makes it difficult for Hypothesis to generate '666 'good examples, especially when trying to reduce failing ones '667 'at the end. Consider reducing the size of your data if it is '668 'of a fixed size. You could also fix this by improving how '669 'your data shrinks (see https://hypothesis.readthedocs.io/en/'670 'latest/data.html#shrinking for details), or by introducing '671 'default values inside your strategy. e.g. could you replace '672 'some arguments with their defaults by using '673 'one_of(none(), some_complex_strategy)?',674 HealthCheck.large_base_example675 )676 # If the language starts with writes of length >= cap then there is677 # only one string in it: Everything after cap is forced to be zero (or678 # to be whatever value is written there). That means that once we've679 # tried the zero value, there's nothing left for us to do, so we680 # exit early here.681 for i in hrange(self.cap):682 if i not in zero_data.forced_indices:683 break684 else:685 self.exit_with(ExitReason.finished)686 self.health_check_state = HealthCheckState()687 count = 0688 while not self.interesting_examples and (689 count < 10 or self.health_check_state is not None690 ):691 prefix = self.generate_novel_prefix()692 def draw_bytes(data, n):693 if data.index < len(prefix):694 result = prefix[data.index:data.index + n]695 if len(result) < n:696 result += uniform(self.random, n - len(result))697 else:698 result = uniform(self.random, n)699 return self.__zero_bound(data, result)700 targets_found = len(self.covering_examples)701 last_data = ConjectureData(702 max_length=self.settings.buffer_size,703 draw_bytes=draw_bytes704 )705 self.test_function(last_data)706 last_data.freeze()707 count += 1708 mutations = 0709 mutator = self._new_mutator()710 zero_bound_queue = []711 while not self.interesting_examples:712 if zero_bound_queue:713 # Whenever we generated an example and it hits a bound714 # which forces zero blocks into it, this creates a weird715 # distortion effect by making certain parts of the data716 # stream (especially ones to the right) much more likely717 # to be zero. We fix this by redistributing the generated718 # data by shuffling it randomly. This results in the719 # zero data being spread evenly throughout the buffer.720 # Hopefully the shrinking this causes will cause us to721 # naturally fail to hit the bound.722 # If it doesn't then we will queue the new version up again723 # (now with more zeros) and try again.724 overdrawn = zero_bound_queue.pop()725 buffer = bytearray(overdrawn.buffer)726 # These will have values written to them that are different727 # from what's in them anyway, so the value there doesn't728 # really "count" for distributional purposes, and if we729 # leave them in then they can cause the fraction of non730 # zero bytes to increase on redraw instead of decrease.731 for i in overdrawn.forced_indices:732 buffer[i] = 0733 self.random.shuffle(buffer)734 buffer = hbytes(buffer)735 def draw_bytes(data, n):736 result = buffer[data.index:data.index + n]737 if len(result) < n:738 result += hbytes(n - len(result))739 return self.__rewrite(data, result)740 data = ConjectureData(741 draw_bytes=draw_bytes,742 max_length=self.settings.buffer_size,743 )744 self.test_function(data)745 data.freeze()746 else:747 origin = self.target_selector.select()748 mutations += 1749 targets_found = len(self.covering_examples)750 data = ConjectureData(751 draw_bytes=mutator(origin),752 max_length=self.settings.buffer_size753 )754 self.test_function(data)755 data.freeze()756 if (757 data.status > origin.status or758 len(self.covering_examples) > targets_found759 ):760 mutations = 0761 elif (762 data.status < origin.status or763 mutations >= 10764 ):765 # Cap the variations of a single example and move on to766 # an entirely fresh start. Ten is an entirely arbitrary767 # constant, but it's been working well for years.768 mutations = 0769 mutator = self._new_mutator()770 if getattr(data, 'hit_zero_bound', False):771 zero_bound_queue.append(data)772 mutations += 1773 def _run(self):774 self.start_time = benchmark_time()775 self.reuse_existing_examples()776 self.generate_new_examples()777 self.shrink_interesting_examples()778 self.exit_with(ExitReason.finished)779 def shrink_interesting_examples(self):780 """If we've found interesting examples, try to replace each of them781 with a minimal interesting example with the same interesting_origin.782 We may find one or more examples with a new interesting_origin783 during the shrink process. If so we shrink these too.784 """785 if (786 Phase.shrink not in self.settings.phases or787 not self.interesting_examples788 ):789 return790 for prev_data in sorted(791 self.interesting_examples.values(),792 key=lambda d: sort_key(d.buffer)793 ):794 assert prev_data.status == Status.INTERESTING795 data = ConjectureData.for_buffer(prev_data.buffer)796 self.test_function(data)797 if data.status != Status.INTERESTING:798 self.exit_with(ExitReason.flaky)799 self.clear_secondary_key()800 while len(self.shrunk_examples) < len(self.interesting_examples):801 target, example = min([802 (k, v) for k, v in self.interesting_examples.items()803 if k not in self.shrunk_examples],804 key=lambda kv: (sort_key(kv[1].buffer), sort_key(repr(kv[0]))),805 )806 self.debug('Shrinking %r' % (target,))807 def predicate(d):808 if d.status < Status.INTERESTING:809 return False810 return d.interesting_origin == target811 self.shrink(example, predicate)812 self.shrunk_examples.add(target)813 def clear_secondary_key(self):814 if self.has_existing_examples():815 # If we have any smaller examples in the secondary corpus, now is816 # a good time to try them to see if they work as shrinks. They817 # probably won't, but it's worth a shot and gives us a good818 # opportunity to clear out the database.819 # It's not worth trying the primary corpus because we already820 # tried all of those in the initial phase.821 corpus = sorted(822 self.settings.database.fetch(self.secondary_key),823 key=sort_key824 )825 for c in corpus:826 primary = {827 v.buffer for v in self.interesting_examples.values()828 }829 cap = max(map(sort_key, primary))830 if sort_key(c) > cap:831 break832 else:833 self.cached_test_function(c)834 # We unconditionally remove c from the secondary key as it835 # is either now primary or worse than our primary example836 # of this reason for interestingness.837 self.settings.database.delete(self.secondary_key, c)838 def shrink(self, example, predicate):839 s = self.new_shrinker(example, predicate)840 s.shrink()841 return s.shrink_target842 def new_shrinker(self, example, predicate):843 return Shrinker(self, example, predicate)844 def prescreen_buffer(self, buffer):845 """Attempt to rule out buffer as a possible interesting candidate.846 Returns False if we know for sure that running this buffer will not847 produce an interesting result. Returns True if it might (because it848 explores territory we have not previously tried).849 This is purely an optimisation to try to reduce the number of tests we850 run. "return True" would be a valid but inefficient implementation.851 """852 # Traverse the tree, to see if we have already tried this buffer853 # (or a prefix of it).854 node_index = 0855 n = len(buffer)856 for k, b in enumerate(buffer):857 if node_index in self.dead:858 # This buffer (or a prefix of it) has already been tested,859 # or has already had its descendants fully explored.860 # Testing it again would not be helpful.861 return False862 try:863 # The block size at that point provides a lower bound on how864 # many more bytes are required. If the buffer does not have865 # enough bytes to fulfill that block size then we can rule out866 # this buffer.867 if k + self.block_sizes[node_index] > n:868 return False869 except KeyError:870 pass871 # If there's a forced value or a mask at this position, then872 # pretend that the buffer already contains a matching value,873 # because the test function is going to do the same.874 try:875 b = self.forced[node_index]876 except KeyError:877 pass878 try:879 b = b & self.masks[node_index]880 except KeyError:881 pass882 try:883 node_index = self.tree[node_index][b]884 except KeyError:885 # The buffer wasn't in the tree, which means we haven't tried886 # it. That makes it a possible candidate.887 return True888 else:889 # We ran out of buffer before reaching a leaf or a missing node.890 # That means the test function is going to draw beyond the end891 # of this buffer, which makes it a bad candidate.892 return False893 def cached_test_function(self, buffer):894 """Checks the tree to see if we've tested this buffer, and returns the895 previous result if we have.896 Otherwise we call through to ``test_function``, and return a897 fresh result.898 """899 node_index = 0900 for c in buffer:901 # If there's a forced value or a mask at this position, then902 # pretend that the buffer already contains a matching value,903 # because the test function is going to do the same.904 try:905 c = self.forced[node_index]906 except KeyError:907 pass908 try:909 c = c & self.masks[node_index]910 except KeyError:911 pass912 try:913 node_index = self.tree[node_index][c]914 except KeyError:915 # The byte at this position isn't in the tree, which means916 # we haven't tested this buffer. Break out of the tree917 # traversal, and run the test function normally.918 break919 node = self.tree[node_index]920 if isinstance(node, ConjectureData):921 # This buffer (or a prefix of it) has already been tested.922 # Return the stored result instead of trying it again.923 return node924 else:925 # Falling off the end of this loop means that we're about to test926 # a prefix of a previously-tested byte stream. The test is going927 # to draw beyond the end of the buffer, and fail due to overrun.928 # Currently there is no special handling for this case.929 pass930 # We didn't find a match in the tree, so we need to run the test931 # function normally.932 result = ConjectureData.for_buffer(buffer)933 self.test_function(result)934 return result935 def event_to_string(self, event):936 if isinstance(event, str):937 return event938 try:939 return self.events_to_strings[event]940 except KeyError:941 pass942 result = str(event)943 self.events_to_strings[event] = result944 return result945def _is_simple_mask(mask):946 """A simple mask is ``(2 ** n - 1)`` for some ``n``, so it has the effect947 of keeping the lowest ``n`` bits and discarding the rest.948 A mask in this form can produce any integer between 0 and the mask itself949 (inclusive), and the total number of these values is ``(mask + 1)``.950 """951 return (mask & (mask + 1)) == 0952def _draw_predecessor(rnd, xs):953 r = bytearray()954 any_strict = False955 for x in to_bytes_sequence(xs):956 if not any_strict:957 c = rnd.randint(0, x)958 if c < x:959 any_strict = True960 else:961 c = rnd.randint(0, 255)962 r.append(c)963 return hbytes(r)964def _draw_successor(rnd, xs):965 r = bytearray()966 any_strict = False967 for x in to_bytes_sequence(xs):968 if not any_strict:969 c = rnd.randint(x, 255)970 if c > x:971 any_strict = True972 else:973 c = rnd.randint(0, 255)974 r.append(c)975 return hbytes(r)976def sort_key(buffer):977 return (len(buffer), buffer)978def uniform(random, n):979 return int_to_bytes(random.getrandbits(n * 8), n)980def pop_random(random, values):981 """Remove a random element of values, possibly changing the ordering of its982 elements."""983 # We pick the element at a random index. Rather than removing that element984 # from the list (which would be an O(n) operation), we swap it to the end985 # and return the last element of the list. This changes the order of986 # the elements, but as long as these elements are only accessed through987 # random sampling that doesn't matter.988 i = random.randrange(0, len(values))989 values[i], values[-1] = values[-1], values[i]990 return values.pop()991class TargetSelector(object):992 """Data structure for selecting targets to use for mutation.993 The main purpose of the TargetSelector is to maintain a pool of "reasonably994 useful" examples, while keeping the pool of bounded size.995 In particular it ensures:996 1. We only retain examples of the best status we've seen so far (not997 counting INTERESTING, which is special).998 2. We preferentially return examples we've never returned before when999 select() is called.1000 3. The number of retained examples is never more than self.pool_size, with1001 past examples discarded automatically, preferring ones that we have1002 already explored from.1003 These invariants are fairly heavily prone to change - they're not1004 especially well validated as being optimal, and are mostly just a decent1005 compromise between diversity and keeping the pool size bounded.1006 """1007 def __init__(self, random, pool_size=MUTATION_POOL_SIZE):1008 self.random = random1009 self.best_status = Status.OVERRUN1010 self.pool_size = pool_size1011 self.reset()1012 def __len__(self):1013 return len(self.fresh_examples) + len(self.used_examples)1014 def reset(self):1015 self.fresh_examples = []1016 self.used_examples = []1017 def add(self, data):1018 if data.status == Status.INTERESTING:1019 return1020 if data.status < self.best_status:1021 return1022 if data.status > self.best_status:1023 self.best_status = data.status1024 self.reset()1025 # Note that technically data could be a duplicate. This rarely happens1026 # (only if we've exceeded the CACHE_RESET_FREQUENCY number of test1027 # function calls), but it's certainly possible. This could result in1028 # us having the same example multiple times, possibly spread over both1029 # lists. We could check for this, but it's not a major problem so we1030 # don't bother.1031 self.fresh_examples.append(data)1032 if len(self) > self.pool_size:1033 pop_random(self.random, self.used_examples or self.fresh_examples)1034 assert self.pool_size == len(self)1035 def select(self):1036 if self.fresh_examples:1037 result = pop_random(self.random, self.fresh_examples)1038 self.used_examples.append(result)1039 return result1040 else:1041 return self.random.choice(self.used_examples)1042def block_program(description):1043 """Mini-DSL for block rewriting. A sequence of commands that will be run1044 over all contiguous sequences of blocks of the description length in order.1045 Commands are:1046 * ".", keep this block unchanged1047 * "-", subtract one from this block.1048 * "0", replace this block with zero1049 * "X", delete this block1050 If a command does not apply (currently only because it's - on a zero1051 block) the block will be silently skipped over. As a side effect of1052 running a block program its score will be updated.1053 """1054 def run(self):1055 n = len(description)1056 i = 01057 while i + n <= len(self.shrink_target.blocks):1058 attempt = bytearray(self.shrink_target.buffer)1059 failed = False1060 for k, d in reversed(list(enumerate(description))):1061 j = i + k1062 u, v = self.blocks[j].bounds1063 if d == '-':1064 value = int_from_bytes(attempt[u:v])1065 if value == 0:1066 failed = True1067 break1068 else:1069 attempt[u:v] = int_to_bytes(value - 1, v - u)1070 elif d == 'X':1071 del attempt[u:v]1072 else: # pragma: no cover1073 assert False, 'Unrecognised command %r' % (d,)1074 if failed or not self.incorporate_new_buffer(attempt):1075 i += 11076 run.command = description1077 run.__name__ = 'block_program(%r)' % (description,)1078 return run1079class PassClassification(Enum):1080 CANDIDATE = 01081 HOPEFUL = 11082 DUBIOUS = 21083 AVOID = 31084 SPECIAL = 41085@total_ordering1086@attr.s(slots=True, cmp=False)1087class ShrinkPass(object):1088 pass_function = attr.ib()1089 index = attr.ib()1090 classification = attr.ib(default=PassClassification.CANDIDATE)1091 successes = attr.ib(default=0)1092 runs = attr.ib(default=0)1093 calls = attr.ib(default=0)1094 shrinks = attr.ib(default=0)1095 deletions = attr.ib(default=0)1096 @property1097 def failures(self):1098 return self.runs - self.successes1099 @property1100 def name(self):1101 return self.pass_function.__name__1102 def __eq__(self, other):1103 return self.index == other.index1104 def __hash__(self):1105 return hash(self.index)1106 def __lt__(self, other):1107 return self.key() < other.key()1108 def key(self):1109 # Smaller is better.1110 return (1111 self.runs,1112 self.failures,1113 self.calls,1114 self.index1115 )1116class Shrinker(object):1117 """A shrinker is a child object of a ConjectureRunner which is designed to1118 manage the associated state of a particular shrink problem.1119 Currently the only shrink problem we care about is "interesting and with a1120 particular interesting_origin", but this is abstracted into a general1121 purpose predicate for more flexibility later - e.g. we are likely to want1122 to shrink with respect to a particular coverage target later.1123 Data with a status < VALID may be assumed not to satisfy the predicate.1124 The expected usage pattern is that this is only ever called from within the1125 engine.1126 """1127 DEFAULT_PASSES = [1128 'pass_to_descendant',1129 'zero_examples',1130 'adaptive_example_deletion',1131 'reorder_examples',1132 'minimize_duplicated_blocks',1133 'minimize_individual_blocks',1134 ]1135 EMERGENCY_PASSES = [1136 block_program('-XX'),1137 block_program('XX'),1138 'example_deletion_with_block_lowering',1139 'shrink_offset_pairs',1140 'minimize_block_pairs_retaining_sum',1141 ]1142 def __init__(self, engine, initial, predicate):1143 """Create a shrinker for a particular engine, with a given starting1144 point and predicate. When shrink() is called it will attempt to find an1145 example for which predicate is True and which is strictly smaller than1146 initial.1147 Note that initial is a ConjectureData object, and predicate1148 takes ConjectureData objects.1149 """1150 self.__engine = engine1151 self.__predicate = predicate1152 self.discarding_failed = False1153 self.__shrinking_prefixes = set()1154 self.initial_size = len(initial.buffer)1155 # We add a second level of caching local to the shrinker. This is a bit1156 # of a hack. Ideally we'd be able to rely on the engine's functionality1157 # for this. Doing it this way has two benefits: Firstly, the engine1158 # does not currently cache overruns (and probably shouldn't, but could1159 # recreate them on demand if necessary), and secondly Python dicts are1160 # much faster than our pure Python tree-based lookups.1161 self.__test_function_cache = {}1162 # We keep track of the current best example on the shrink_target1163 # attribute.1164 self.shrink_target = None1165 self.update_shrink_target(initial)1166 self.shrinks = 01167 self.initial_calls = self.__engine.call_count1168 self.current_pass_depth = 01169 self.passes_by_name = {}1170 self.clear_passes()1171 for p in Shrinker.DEFAULT_PASSES:1172 self.add_new_pass(p)1173 for p in Shrinker.EMERGENCY_PASSES:1174 self.add_new_pass(p, classification=PassClassification.AVOID)1175 self.add_new_pass(1176 'lower_common_block_offset',1177 classification=PassClassification.SPECIAL1178 )1179 def clear_passes(self):1180 """Reset all passes on the shrinker, leaving it in a blank state.1181 This is mostly useful for testing.1182 """1183 # Note that we deliberately do not clear passes_by_name. This means1184 # that we can still look up and explicitly run the standard passes,1185 # they just won't be avaiable by default.1186 self.passes = []1187 self.passes_awaiting_requeue = []1188 self.pass_queues = {c: [] for c in PassClassification}1189 self.known_programs = set()1190 def add_new_pass(self, run, classification=PassClassification.CANDIDATE):1191 """Creates a shrink pass corresponding to calling ``run(self)``"""1192 if isinstance(run, str):1193 run = getattr(Shrinker, run)1194 p = ShrinkPass(1195 pass_function=run, index=len(self.passes),1196 classification=classification,1197 )1198 if hasattr(run, 'command'):1199 self.known_programs.add(run.command)1200 self.passes.append(p)1201 self.passes_awaiting_requeue.append(p)1202 self.passes_by_name[p.name] = p1203 return p1204 def shrink_pass(self, name):1205 if hasattr(Shrinker, name) and name not in self.passes_by_name:1206 self.add_new_pass(name, classification=PassClassification.SPECIAL)1207 return self.passes_by_name[name]1208 def requeue_passes(self):1209 """Move all passes from passes_awaiting_requeue to their relevant1210 queues."""1211 while self.passes_awaiting_requeue:1212 p = self.passes_awaiting_requeue.pop()1213 heapq.heappush(self.pass_queues[p.classification], p)1214 def has_queued_passes(self, classification):1215 """Checks if any shrink passes are currently enqued under this1216 classification (note that there may be passes with this classification1217 currently awaiting requeue)."""1218 return len(self.pass_queues[classification]) > 01219 def pop_queued_pass(self, classification):1220 """Pop and run a single queued pass with this classification."""1221 sp = heapq.heappop(self.pass_queues[classification])1222 self.passes_awaiting_requeue.append(sp)1223 self.run_shrink_pass(sp)1224 def run_queued_until_change(self, classification):1225 """Run passes with this classification until there are no more or one1226 of them succeeds in shrinking the target."""1227 initial = self.shrink_target1228 while (1229 self.has_queued_passes(classification) and1230 self.shrink_target is initial1231 ):1232 self.pop_queued_pass(classification)1233 return self.shrink_target is not initial1234 def run_one_queued_pass(self, classification):1235 """Run a single queud pass with this classification (if there are1236 any)."""1237 if self.has_queued_passes(classification):1238 self.pop_queued_pass(classification)1239 def run_queued_passes(self, classification):1240 """Run all queued passes with this classification."""1241 while self.has_queued_passes(classification):1242 self.pop_queued_pass(classification)1243 @property1244 def calls(self):1245 return self.__engine.call_count1246 def consider_new_buffer(self, buffer):1247 buffer = hbytes(buffer)1248 return buffer.startswith(self.buffer) or \1249 self.incorporate_new_buffer(buffer)1250 def incorporate_new_buffer(self, buffer):1251 buffer = hbytes(buffer[:self.shrink_target.index])1252 try:1253 existing = self.__test_function_cache[buffer]1254 except KeyError:1255 pass1256 else:1257 return self.incorporate_test_data(existing)1258 # Sometimes an attempt at lexicographic minimization will do the wrong1259 # thing because the buffer has changed under it (e.g. something has1260 # turned into a write, the bit size has changed). The result would be1261 # an invalid string, but it's better for us to just ignore it here as1262 # it turns out to involve quite a lot of tricky book-keeping to get1263 # this right and it's better to just handle it in one place.1264 if sort_key(buffer) >= sort_key(self.shrink_target.buffer):1265 return False1266 if self.shrink_target.buffer.startswith(buffer):1267 return False1268 if not self.__engine.prescreen_buffer(buffer):1269 return False1270 assert sort_key(buffer) <= sort_key(self.shrink_target.buffer)1271 data = ConjectureData.for_buffer(buffer)1272 self.__engine.test_function(data)1273 self.__test_function_cache[buffer] = data1274 return self.incorporate_test_data(data)1275 def incorporate_test_data(self, data):1276 self.__test_function_cache[data.buffer] = data1277 if (1278 self.__predicate(data) and1279 sort_key(data.buffer) < sort_key(self.shrink_target.buffer)1280 ):1281 self.update_shrink_target(data)1282 self.__shrinking_block_cache = {}1283 return True1284 return False1285 def cached_test_function(self, buffer):1286 buffer = hbytes(buffer)1287 try:1288 return self.__test_function_cache[buffer]1289 except KeyError:1290 pass1291 result = self.__engine.cached_test_function(buffer)1292 self.incorporate_test_data(result)1293 self.__test_function_cache[buffer] = result1294 return result1295 def debug(self, msg):1296 self.__engine.debug(msg)1297 @property1298 def random(self):1299 return self.__engine.random1300 def run_shrink_pass(self, sp):1301 """Runs the function associated with ShrinkPass sp and updates the1302 relevant metadata.1303 Note that sp may or may not be a pass currently associated with1304 this shrinker. This does not handle any requeing that is1305 required.1306 """1307 if isinstance(sp, str):1308 sp = self.shrink_pass(sp)1309 self.debug('Shrink Pass %s' % (sp.name,))1310 initial_shrinks = self.shrinks1311 initial_calls = self.calls1312 size = len(self.shrink_target.buffer)1313 try:1314 sp.pass_function(self)1315 finally:1316 calls = self.calls - initial_calls1317 shrinks = self.shrinks - initial_shrinks1318 deletions = size - len(self.shrink_target.buffer)1319 sp.calls += calls1320 sp.shrinks += shrinks1321 sp.deletions += deletions1322 sp.runs += 11323 self.debug('Shrink Pass %s completed.' % (sp.name,))1324 # Complex state machine alert! A pass run can either succeed (we made1325 # at least one shrink) or fail (we didn't). This changes the pass's1326 # current classification according to the following possible1327 # transitions:1328 #1329 # CANDIDATE -------> HOPEFUL1330 # | ^1331 # | |1332 # v v1333 # AVOID ---------> DUBIOUS1334 #1335 # From best to worst we want to run HOPEFUL, CANDIDATE, DUBIOUS, AVOID.1336 # We will try any one of them if we have to but we want to prioritise.1337 #1338 # When a run succeeds, a pass will follow an arrow to a better class.1339 # When it fails, it will follow an arrow to a worse one.1340 # If no such arrow is available, it stays where it is.1341 #1342 # We also have the classification SPECIAL for passes that do not get1343 # run as part of the normal process.1344 previous = sp.classification1345 # If the pass didn't actually do anything we don't reclassify it. This1346 # is for things like remove_discarded which often are inapplicable.1347 if calls > 0 and sp.classification != PassClassification.SPECIAL:1348 if shrinks == 0:1349 if sp.successes > 0:1350 sp.classification = PassClassification.DUBIOUS1351 else:1352 sp.classification = PassClassification.AVOID1353 else:1354 sp.successes += 11355 if sp.classification == PassClassification.AVOID:1356 sp.classification = PassClassification.DUBIOUS1357 else:1358 sp.classification = PassClassification.HOPEFUL1359 if previous != sp.classification:1360 self.debug('Reclassified %s from %s to %s' % (1361 sp.name, previous.name, sp.classification.name1362 ))1363 def shrink(self):1364 """Run the full set of shrinks and update shrink_target.1365 This method is "mostly idempotent" - calling it twice is unlikely to1366 have any effect, though it has a non-zero probability of doing so.1367 """1368 # We assume that if an all-zero block of bytes is an interesting1369 # example then we're not going to do better than that.1370 # This might not technically be true: e.g. for integers() | booleans()1371 # the simplest example is actually [1, 0]. Missing this case is fairly1372 # harmless and this allows us to make various simplifying assumptions1373 # about the structure of the data (principally that we're never1374 # operating on a block of all zero bytes so can use non-zeroness as a1375 # signpost of complexity).1376 if (1377 not any(self.shrink_target.buffer) or1378 self.incorporate_new_buffer(hbytes(len(self.shrink_target.buffer)))1379 ):1380 return1381 try:1382 self.greedy_shrink()1383 finally:1384 if self.__engine.report_debug_info:1385 def s(n):1386 return 's' if n != 1 else ''1387 total_deleted = self.initial_size - len(1388 self.shrink_target.buffer)1389 self.debug('---------------------')1390 self.debug('Shrink pass profiling')1391 self.debug('---------------------')1392 self.debug('')1393 calls = self.__engine.call_count - self.initial_calls1394 self.debug((1395 'Shrinking made a total of %d call%s '1396 'of which %d shrank. This deleted %d byte%s out of %d.'1397 ) % (1398 calls, s(calls),1399 self.shrinks,1400 total_deleted, s(total_deleted),1401 self.initial_size,1402 ))1403 for useful in [True, False]:1404 self.debug('')1405 if useful:1406 self.debug('Useful passes:')1407 else:1408 self.debug('Useless passes:')1409 self.debug('')1410 for p in sorted(1411 self.passes,1412 key=lambda t: (1413 -t.calls, -t.runs,1414 t.deletions, t.shrinks,1415 ),1416 ):1417 if p.calls == 0:1418 continue1419 if (p.shrinks != 0) != useful:1420 continue1421 self.debug((1422 ' * %s ran %d time%s, making %d call%s of which '1423 '%d shrank, deleting %d byte%s.'1424 ) % (1425 p.name,1426 p.runs, s(p.runs),1427 p.calls, s(p.calls),1428 p.shrinks,1429 p.deletions, s(p.deletions),1430 ))1431 self.debug('')1432 def greedy_shrink(self):1433 """Run a full set of greedy shrinks (that is, ones that will only ever1434 move to a better target) and update shrink_target appropriately.1435 This method iterates to a fixed point and so is idempontent - calling1436 it twice will have exactly the same effect as calling it once.1437 """1438 self.run_shrink_pass('alphabet_minimize')1439 while self.single_greedy_shrink_iteration():1440 self.run_shrink_pass('lower_common_block_offset')1441 def single_greedy_shrink_iteration(self):1442 """Performs a single run through each greedy shrink pass, but does not1443 loop to achieve a fixed point."""1444 initial = self.shrink_target1445 # What follows is a slightly delicate dance. What we want to do is try1446 # to ensure that:1447 #1448 # 1. If it is possible for us to be deleting data, we should be.1449 # 2. We do not end up repeating a lot of passes uselessly.1450 # 3. We do not want to run expensive or useless passes if we can1451 # possibly avoid doing so.1452 self.requeue_passes()1453 self.run_shrink_pass('remove_discarded')1454 # First run the entire set of solid passes (ones that have previously1455 # made changes). It's important that we run all of them, not just one,1456 # as typically each pass may unlock others.1457 self.run_queued_passes(PassClassification.HOPEFUL)1458 # While our solid passes are successfully shrinking the buffer, we can1459 # just keep doing that (note that this is a stronger condition than1460 # just making shrinks - it's a much better sense of progress. We can1461 # make only O(n) length reductions but we can make exponentially many1462 # shrinks).1463 if len(self.buffer) < len(initial.buffer):1464 return True1465 # If we're stuck on length reductions then we pull in one candiate pass1466 # (if there are any).1467 # This should hopefully help us unlock any local minima that were1468 # starting to reduce the utility of the previous solid passes.1469 self.run_one_queued_pass(PassClassification.CANDIDATE)1470 # We've pulled in a new candidate pass (or have no candidate passes1471 # left) and are making shrinks with the solid passes, so lets just1472 # keep on doing that.1473 if self.shrink_target is not initial:1474 return True1475 # We're a bit stuck, so it's time to try some new passes.1476 for classification in [1477 # First we try rerunning every pass we've previously seen succeed.1478 PassClassification.DUBIOUS,1479 # If that didn't work, we pull in some new candidate passes.1480 PassClassification.CANDIDATE,1481 # If that still didn't work, we now pull out all the stops and1482 # bring in the desperation passes. These are either passes that1483 # started as CANDIDATE but we have never seen work, or ones that1484 # are so expensive that they begin life as AVOID.1485 PassClassification.AVOID1486 ]:1487 if self.run_queued_until_change(classification):1488 return True1489 assert self.shrink_target is initial1490 return False1491 @property1492 def buffer(self):1493 return self.shrink_target.buffer1494 @property1495 def blocks(self):1496 return self.shrink_target.blocks1497 def all_block_bounds(self):1498 return self.shrink_target.all_block_bounds()1499 def each_pair_of_blocks(self, accept_first, accept_second):1500 """Yield each pair of blocks ``(a, b)``, such that ``a.index <1501 b.index``, but only if ``accept_first(a)`` and ``accept_second(b)`` are1502 both true."""1503 i = 01504 while i < len(self.blocks):1505 j = i + 11506 while j < len(self.blocks):1507 block_i = self.blocks[i]1508 if not accept_first(block_i):1509 break1510 block_j = self.blocks[j]1511 if not accept_second(block_j):1512 j += 11513 continue1514 yield (block_i, block_j)1515 # After this point, the shrink target could have changed,1516 # so blocks need to be re-checked.1517 j += 11518 i += 11519 def pass_to_descendant(self):1520 """Attempt to replace each example with a descendant example.1521 This is designed to deal with strategies that call themselves1522 recursively. For example, suppose we had:1523 binary_tree = st.deferred(1524 lambda: st.one_of(1525 st.integers(), st.tuples(binary_tree, binary_tree)))1526 This pass guarantees that we can replace any binary tree with one of1527 its subtrees - each of those will create an interval that the parent1528 could validly be replaced with, and this pass will try doing that.1529 This is pretty expensive - it takes O(len(intervals)^2) - so we run it1530 late in the process when we've got the number of intervals as far down1531 as possible.1532 """1533 for ex in self.each_non_trivial_example():1534 st = self.shrink_target1535 descendants = sorted(set(1536 st.buffer[d.start:d.end] for d in self.shrink_target.examples1537 if d.start >= ex.start and d.end <= ex.end and1538 d.length < ex.length and d.label == ex.label1539 ), key=sort_key)1540 for d in descendants:1541 if self.incorporate_new_buffer(1542 self.buffer[:ex.start] + d + self.buffer[ex.end:]1543 ):1544 break1545 def is_shrinking_block(self, i):1546 """Checks whether block i has been previously marked as a shrinking1547 block.1548 If the shrink target has changed since i was last checked, will1549 attempt to calculate if an equivalent block in a previous shrink1550 target was marked as shrinking.1551 """1552 if not self.__shrinking_prefixes:1553 return False1554 try:1555 return self.__shrinking_block_cache[i]1556 except KeyError:1557 pass1558 t = self.shrink_target1559 return self.__shrinking_block_cache.setdefault(1560 i,1561 t.buffer[:t.blocks[i].start] in self.__shrinking_prefixes1562 )1563 def is_payload_block(self, i):1564 """A block is payload if it is entirely non-structural: We can tinker1565 with its value freely and this will not affect the shape of the input1566 language.1567 This is mostly a useful concept when we're doing lexicographic1568 minimimization on multiple blocks at once - by restricting ourself to1569 payload blocks, we expect the shape of the language to not change1570 under us (but must still guard against it doing so).1571 """1572 return not (1573 self.is_shrinking_block(i) or1574 self.shrink_target.blocks[i].forced1575 )1576 def lower_common_block_offset(self):1577 """Sometimes we find ourselves in a situation where changes to one part1578 of the byte stream unlock changes to other parts. Sometimes this is1579 good, but sometimes this can cause us to exhibit exponential slow1580 downs!1581 e.g. suppose we had the following:1582 m = draw(integers(min_value=0))1583 n = draw(integers(min_value=0))1584 assert abs(m - n) > 11585 If this fails then we'll end up with a loop where on each iteration we1586 reduce each of m and n by 2 - m can't go lower because of n, then n1587 can't go lower because of m.1588 This will take us O(m) iterations to complete, which is exponential in1589 the data size, as we gradually zig zag our way towards zero.1590 This can only happen if we're failing to reduce the size of the byte1591 stream: The number of iterations that reduce the length of the byte1592 stream is bounded by that length.1593 So what we do is this: We keep track of which blocks are changing, and1594 then if there's some non-zero common offset to them we try and minimize1595 them all at once by lowering that offset.1596 This may not work, and it definitely won't get us out of all possible1597 exponential slow downs (an example of where it doesn't is where the1598 shape of the blocks changes as a result of this bouncing behaviour),1599 but it fails fast when it doesn't work and gets us out of a really1600 nastily slow case when it does.1601 """1602 if len(self.__changed_blocks) <= 1:1603 return1604 current = self.shrink_target1605 blocked = [current.buffer[u:v] for u, v in current.all_block_bounds()]1606 changed = [1607 i for i in sorted(self.__changed_blocks)1608 if not self.shrink_target.blocks[i].trivial1609 ]1610 if not changed:1611 return1612 ints = [int_from_bytes(blocked[i]) for i in changed]1613 offset = min(ints)1614 assert offset > 01615 for i in hrange(len(ints)):1616 ints[i] -= offset1617 def reoffset(o):1618 new_blocks = list(blocked)1619 for i, v in zip(changed, ints):1620 new_blocks[i] = int_to_bytes(v + o, len(blocked[i]))1621 return self.incorporate_new_buffer(hbytes().join(new_blocks))1622 new_offset = Integer.shrink(offset, reoffset, random=self.random)1623 if new_offset == offset:1624 self.clear_change_tracking()1625 def shrink_offset_pairs(self):1626 """Lowers pairs of blocks that need to maintain a constant difference1627 between their respective values.1628 Before this shrink pass, two blocks explicitly offset from each1629 other would not get minimized properly:1630 >>> b = st.integers(0, 255)1631 >>> find(st.tuples(b, b), lambda x: x[0] == x[1] + 1)1632 (149,148)1633 This expensive (O(n^2)) pass goes through every pair of non-zero1634 blocks in the current shrink target and sees if the shrink1635 target can be improved by applying a negative offset to both of them.1636 """1637 def int_from_block(i):1638 u, v = self.blocks[i].bounds1639 block_bytes = self.shrink_target.buffer[u:v]1640 return int_from_bytes(block_bytes)1641 def block_len(i):1642 return self.blocks[i].length1643 # Try reoffseting every pair1644 def reoffset_pair(pair, o):1645 n = len(self.blocks)1646 # Number of blocks may have changed, need to validate1647 valid_pair = [1648 p for p in pair if p < n and int_from_block(p) > 0 and1649 self.is_payload_block(p)1650 ]1651 if len(valid_pair) < 2:1652 return1653 m = min([int_from_block(p) for p in valid_pair])1654 new_blocks = [self.shrink_target.buffer[u:v]1655 for u, v in self.shrink_target.all_block_bounds()]1656 for i in valid_pair:1657 new_blocks[i] = int_to_bytes(1658 int_from_block(i) + o - m, block_len(i))1659 buffer = hbytes().join(new_blocks)1660 return self.incorporate_new_buffer(buffer)1661 def is_non_zero_payload(block):1662 return not block.all_zero and self.is_payload_block(block.index)1663 for block_i, block_j in self.each_pair_of_blocks(1664 is_non_zero_payload,1665 is_non_zero_payload,1666 ):1667 i = block_i.index1668 j = block_j.index1669 value_i = int_from_block(i)1670 value_j = int_from_block(j)1671 offset = min(value_i, value_j)1672 Integer.shrink(1673 offset, lambda o: reoffset_pair((i, j), o),1674 random=self.random,1675 )1676 def mark_shrinking(self, blocks):1677 """Mark each of these blocks as a shrinking block: That is, lowering1678 its value lexicographically may cause less data to be drawn after."""1679 t = self.shrink_target1680 for i in blocks:1681 if self.__shrinking_block_cache.get(i) is True:1682 continue1683 self.__shrinking_block_cache[i] = True1684 prefix = t.buffer[:t.blocks[i].start]1685 self.__shrinking_prefixes.add(prefix)1686 def clear_change_tracking(self):1687 self.__changed_blocks.clear()1688 def mark_changed(self, i):1689 self.__changed_blocks.add(i)1690 def update_shrink_target(self, new_target):1691 assert new_target.frozen1692 if self.shrink_target is not None:1693 current = self.shrink_target.buffer1694 new = new_target.buffer1695 assert sort_key(new) < sort_key(current)1696 self.shrinks += 11697 if (1698 len(new_target.blocks) != len(self.shrink_target.blocks) or1699 new_target.all_block_bounds() !=1700 self.shrink_target.all_block_bounds()1701 ):1702 self.clear_change_tracking()1703 else:1704 for i, (u, v) in enumerate(1705 self.shrink_target.all_block_bounds()1706 ):1707 if (1708 i not in self.__changed_blocks and1709 current[u:v] != new[u:v]1710 ):1711 self.mark_changed(i)1712 else:1713 self.__changed_blocks = set()1714 self.shrink_target = new_target1715 self.__shrinking_block_cache = {}1716 def try_shrinking_blocks(self, blocks, b):1717 """Attempts to replace each block in the blocks list with b. Returns1718 True if it succeeded (which may include some additional modifications1719 to shrink_target).1720 May call mark_shrinking with b if this causes a reduction in size.1721 In current usage it is expected that each of the blocks currently have1722 the same value, although this is not essential. Note that b must be1723 < the block at min(blocks) or this is not a valid shrink.1724 This method will attempt to do some small amount of work to delete data1725 that occurs after the end of the blocks. This is useful for cases where1726 there is some size dependency on the value of a block.1727 """1728 initial_attempt = bytearray(self.shrink_target.buffer)1729 for i, block in enumerate(blocks):1730 if block >= len(self.blocks):1731 blocks = blocks[:i]1732 break1733 u, v = self.blocks[block].bounds1734 n = min(v - u, len(b))1735 initial_attempt[v - n:v] = b[-n:]1736 start = self.shrink_target.blocks[blocks[0]].start1737 end = self.shrink_target.blocks[blocks[-1]].end1738 initial_data = self.cached_test_function(initial_attempt)1739 if initial_data.status == Status.INTERESTING:1740 return initial_data is self.shrink_target1741 # If this produced something completely invalid we ditch it1742 # here rather than trying to persevere.1743 if initial_data.status < Status.VALID:1744 return False1745 # We've shrunk inside our group of blocks, so we have no way to1746 # continue. (This only happens when shrinking more than one block at1747 # a time).1748 if len(initial_data.buffer) < v:1749 return False1750 lost_data = len(self.shrink_target.buffer) - len(initial_data.buffer)1751 # If this did not in fact cause the data size to shrink we1752 # bail here because it's not worth trying to delete stuff from1753 # the remainder.1754 if lost_data <= 0:1755 return False1756 self.mark_shrinking(blocks)1757 # We now look for contiguous regions to delete that might help fix up1758 # this failed shrink. We only look for contiguous regions of the right1759 # lengths because doing anything more than that starts to get very1760 # expensive. See example_deletion_with_block_lowering for where we1761 # try to be more aggressive.1762 regions_to_delete = {(end, end + lost_data)}1763 for j in (blocks[-1] + 1, blocks[-1] + 2):1764 if j >= min(len(initial_data.blocks), len(self.blocks)):1765 continue1766 # We look for a block very shortly after the last one that has1767 # lost some of its size, and try to delete from the beginning so1768 # that it retains the same integer value. This is a bit of a hyper1769 # specific trick designed to make our integers() strategy shrink1770 # well.1771 r1, s1 = self.shrink_target.blocks[j].bounds1772 r2, s2 = initial_data.blocks[j].bounds1773 lost = (s1 - r1) - (s2 - r2)1774 # Apparently a coverage bug? An assert False in the body of this1775 # will reliably fail, but it shows up as uncovered.1776 if lost <= 0 or r1 != r2: # pragma: no cover1777 continue1778 regions_to_delete.add((r1, r1 + lost))1779 for ex in self.shrink_target.examples:1780 if ex.start > start:1781 continue1782 if ex.end <= end:1783 continue1784 replacement = initial_data.examples[ex.index]1785 in_original = [1786 c for c in ex.children if c.start >= end1787 ]1788 in_replaced = [1789 c for c in replacement.children if c.start >= end1790 ]1791 if len(in_replaced) >= len(in_original) or not in_replaced:1792 continue1793 # We've found an example where some of the children went missing1794 # as a result of this change, and just replacing it with the data1795 # it would have had and removing the spillover didn't work. This1796 # means that some of its children towards the right must be1797 # important, so we try to arrange it so that it retains its1798 # rightmost children instead of its leftmost.1799 regions_to_delete.add((1800 in_original[0].start, in_original[-len(in_replaced)].start1801 ))1802 for u, v in sorted(1803 regions_to_delete, key=lambda x: x[1] - x[0], reverse=True1804 ):1805 try_with_deleted = bytearray(initial_attempt)1806 del try_with_deleted[u:v]1807 if self.incorporate_new_buffer(try_with_deleted):1808 return True1809 return False1810 def remove_discarded(self):1811 """Try removing all bytes marked as discarded.1812 This pass is primarily to deal with data that has been ignored while1813 doing rejection sampling - e.g. as a result of an integer range, or a1814 filtered strategy.1815 Such data will also be handled by the adaptive_example_deletion pass,1816 but that pass is necessarily more conservative and will try deleting1817 each interval individually. The common case is that all data drawn and1818 rejected can just be thrown away immediately in one block, so this pass1819 will be much faster than trying each one individually when it works.1820 """1821 while self.shrink_target.has_discards:1822 discarded = []1823 for ex in self.shrink_target.examples:1824 if ex.discarded and (1825 not discarded or ex.start >= discarded[-1][-1]1826 ):1827 discarded.append((ex.start, ex.end))1828 assert discarded1829 attempt = bytearray(self.shrink_target.buffer)1830 for u, v in reversed(discarded):1831 del attempt[u:v]1832 if not self.incorporate_new_buffer(attempt):1833 break1834 def each_non_trivial_example(self):1835 """Iterates over all non-trivial examples in the current shrink target,1836 with care taken to ensure that every example yielded is current.1837 Makes the assumption that no modifications will be made to the1838 shrink target prior to the currently yielded example. If this1839 assumption is violated this will probably raise an error, so1840 don't do that.1841 """1842 stack = [0]1843 while stack:1844 target = stack.pop()1845 if isinstance(target, tuple):1846 parent, i = target1847 parent = self.shrink_target.examples[parent]1848 example_index = parent.children[i].index1849 else:1850 example_index = target1851 ex = self.shrink_target.examples[example_index]1852 if ex.trivial:1853 continue1854 yield ex1855 ex = self.shrink_target.examples[example_index]1856 if ex.trivial:1857 continue1858 for i in range(len(ex.children)):1859 stack.append((example_index, i))1860 def example_wise_shrink(self, shrinker, **kwargs):1861 """Runs a sequence shrinker on the children of each example."""1862 for ex in self.each_non_trivial_example():1863 st = self.shrink_target1864 pieces = [1865 st.buffer[c.start:c.end]1866 for c in ex.children1867 ]1868 if not pieces:1869 pieces = [st.buffer[ex.start:ex.end]]1870 prefix = st.buffer[:ex.start]1871 suffix = st.buffer[ex.end:]1872 shrinker.shrink(1873 pieces, lambda ls: self.incorporate_new_buffer(1874 prefix + hbytes().join(ls) + suffix,1875 ), random=self.random, **kwargs1876 )1877 def adaptive_example_deletion(self):1878 """Recursive deletion pass that tries to make the example located at1879 example_index as small as possible. This is the main point at which we1880 try to lower the size of the data.1881 First attempts to replace the example with its minimal possible version1882 using zero_example. If the example is trivial (either because of that1883 or because it was anyway) then we assume there's nothing we can1884 usefully do here and return early. Otherwise, we attempt to minimize it1885 by deleting its children.1886 If we do not make any successful changes, we recurse to the example's1887 children and attempt the same there.1888 """1889 self.example_wise_shrink(Length)1890 def zero_examples(self):1891 """Attempt to replace each example with a minimal version of itself."""1892 for ex in self.each_non_trivial_example():1893 u = ex.start1894 v = ex.end1895 attempt = self.cached_test_function(1896 self.buffer[:u] + hbytes(v - u) + self.buffer[v:]1897 )1898 # FIXME: IOU one attempt to debug this - DRMacIver1899 # This is a mysterious problem that should be impossible to trigger1900 # but isn't. I don't know what's going on, and it defeated my1901 # my attempts to reproduce or debug it. I'd *guess* it's related to1902 # nondeterminism in the test function. That should be impossible in1903 # the cases where I'm seeing it, but I haven't been able to put1904 # together a reliable reproduction of it.1905 if ex.index >= len(attempt.examples): # pragma: no cover1906 continue1907 in_replacement = attempt.examples[ex.index]1908 used = in_replacement.length1909 if (1910 not self.__predicate(attempt) and1911 in_replacement.end < len(attempt.buffer) and1912 used < ex.length1913 ):1914 self.incorporate_new_buffer(1915 self.buffer[:u] + hbytes(used) + self.buffer[v:]1916 )1917 def minimize_duplicated_blocks(self):1918 """Find blocks that have been duplicated in multiple places and attempt1919 to minimize all of the duplicates simultaneously.1920 This lets us handle cases where two values can't be shrunk1921 independently of each other but can easily be shrunk together.1922 For example if we had something like:1923 ls = data.draw(lists(integers()))1924 y = data.draw(integers())1925 assert y not in ls1926 Suppose we drew y = 3 and after shrinking we have ls = [3]. If we were1927 to replace both 3s with 0, this would be a valid shrink, but if we were1928 to replace either 3 with 0 on its own the test would start passing.1929 It is also useful for when that duplication is accidental and the value1930 of the blocks doesn't matter very much because it allows us to replace1931 more values at once.1932 """1933 def canon(b):1934 i = 01935 while i < len(b) and b[i] == 0:1936 i += 11937 return b[i:]1938 counts = Counter(1939 canon(self.shrink_target.buffer[u:v])1940 for u, v in self.all_block_bounds()1941 )1942 counts.pop(hbytes(), None)1943 blocks = [buffer for buffer, count in counts.items() if count > 1]1944 blocks.sort(reverse=True)1945 blocks.sort(key=lambda b: counts[b] * len(b), reverse=True)1946 for block in blocks:1947 targets = [1948 i for i, (u, v) in enumerate(self.all_block_bounds())1949 if canon(self.shrink_target.buffer[u:v]) == block1950 ]1951 # This can happen if some blocks have been lost in the previous1952 # shrinking.1953 if len(targets) <= 1:1954 continue1955 Lexical.shrink(1956 block,1957 lambda b: self.try_shrinking_blocks(targets, b),1958 random=self.random, full=False1959 )1960 def minimize_individual_blocks(self):1961 """Attempt to minimize each block in sequence.1962 This is the pass that ensures that e.g. each integer we draw is a...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!