Best Python code snippet using hypothesis
engine.py
Source:engine.py
...493 self.statistics["targets"] = dict(self.best_observed_targets)494 self.debug("exit_with(%s)" % (reason.name,))495 self.exit_reason = reason496 raise RunIsComplete()497 def should_generate_more(self):498 # End the generation phase where we would have ended it if no bugs had499 # been found. This reproduces the exit logic in `self.test_function`,500 # but with the important distinction that this clause will move on to501 # the shrinking phase having found one or more bugs, while the other502 # will exit having found zero bugs.503 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(504 self.settings.max_examples * 10, 1000505 ): # pragma: no cover506 return False507 # If we haven't found a bug, keep looking - if we hit any limits on508 # the number of tests to run that will raise an exception and stop509 # the run.510 if not self.interesting_examples:511 return True512 # If we've found a bug and won't report more than one, stop looking.513 elif not self.settings.report_multiple_bugs:514 return False515 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count516 # Otherwise, keep searching for between ten and 'a heuristic' calls.517 # We cap 'calls after first bug' so errors are reported reasonably518 # soon even for tests that are allowed to run for a very long time,519 # or sooner if the latest half of our test effort has been fruitless.520 return self.call_count < MIN_TEST_CALLS or self.call_count < min(521 self.first_bug_found_at + 1000, self.last_bug_found_at * 2522 )523 def generate_new_examples(self):524 if Phase.generate not in self.settings.phases:525 return526 if self.interesting_examples:527 # The example database has failing examples from a previous run,528 # so we'd rather report that they're still failing ASAP than take529 # the time to look for additional failures.530 return531 self.debug("Generating new examples")532 assert self.should_generate_more()533 zero_data = self.cached_test_function(bytes(BUFFER_SIZE))534 if zero_data.status > Status.OVERRUN:535 self.__data_cache.pin(zero_data.buffer)536 if zero_data.status == Status.OVERRUN or (537 zero_data.status == Status.VALID and len(zero_data.buffer) * 2 > BUFFER_SIZE538 ):539 fail_health_check(540 self.settings,541 "The smallest natural example for your test is extremely "542 "large. This makes it difficult for Hypothesis to generate "543 "good examples, especially when trying to reduce failing ones "544 "at the end. Consider reducing the size of your data if it is "545 "of a fixed size. You could also fix this by improving how "546 "your data shrinks (see https://hypothesis.readthedocs.io/en/"547 "latest/data.html#shrinking for details), or by introducing "548 "default values inside your strategy. e.g. could you replace "549 "some arguments with their defaults by using "550 "one_of(none(), some_complex_strategy)?",551 HealthCheck.large_base_example,552 )553 self.health_check_state = HealthCheckState()554 # We attempt to use the size of the minimal generated test case starting555 # from a given novel prefix as a guideline to generate smaller test556 # cases for an initial period, by restriscting ourselves to test cases557 # that are not much larger than it.558 #559 # Calculating the actual minimal generated test case is hard, so we560 # take a best guess that zero extending a prefix produces the minimal561 # test case starting with that prefix (this is true for our built in562 # strategies). This is only a reasonable thing to do if the resulting563 # test case is valid. If we regularly run into situations where it is564 # not valid then this strategy is a waste of time, so we want to565 # abandon it early. In order to do this we track how many times in a566 # row it has failed to work, and abort small test case generation when567 # it has failed too many times in a row.568 consecutive_zero_extend_is_invalid = 0569 # We control growth during initial example generation, for two570 # reasons:571 #572 # * It gives us an opportunity to find small examples early, which573 # gives us a fast path for easy to find bugs.574 # * It avoids low probability events where we might end up575 # generating very large examples during health checks, which576 # on slower machines can trigger HealthCheck.too_slow.577 #578 # The heuristic we use is that we attempt to estimate the smallest579 # extension of this prefix, and limit the size to no more than580 # an order of magnitude larger than that. If we fail to estimate581 # the size accurately, we skip over this prefix and try again.582 #583 # We need to tune the example size based on the initial prefix,584 # because any fixed size might be too small, and any size based585 # on the strategy in general can fall afoul of strategies that586 # have very different sizes for different prefixes.587 small_example_cap = clamp(10, self.settings.max_examples // 10, 50)588 optimise_at = max(self.settings.max_examples // 2, small_example_cap + 1)589 ran_optimisations = False590 while self.should_generate_more():591 prefix = self.generate_novel_prefix()592 assert len(prefix) <= BUFFER_SIZE593 if (594 self.valid_examples <= small_example_cap595 and self.call_count <= 5 * small_example_cap596 and not self.interesting_examples597 and consecutive_zero_extend_is_invalid < 5598 ):599 minimal_example = self.cached_test_function(600 prefix + bytes(BUFFER_SIZE - len(prefix))601 )602 if minimal_example.status < Status.VALID:603 consecutive_zero_extend_is_invalid += 1604 continue605 consecutive_zero_extend_is_invalid = 0606 minimal_extension = len(minimal_example.buffer) - len(prefix)607 max_length = min(len(prefix) + minimal_extension * 10, BUFFER_SIZE)608 # We could end up in a situation where even though the prefix was609 # novel when we generated it, because we've now tried zero extending610 # it not all possible continuations of it will be novel. In order to611 # avoid making redundant test calls, we rerun it in simulation mode612 # first. If this has a predictable result, then we don't bother613 # running the test function for real here. If however we encounter614 # some novel behaviour, we try again with the real test function,615 # starting from the new novel prefix that has discovered.616 try:617 trial_data = self.new_conjecture_data(618 prefix=prefix, max_length=max_length619 )620 self.tree.simulate_test_function(trial_data)621 continue622 except PreviouslyUnseenBehaviour:623 pass624 # If the simulation entered part of the tree that has been killed,625 # we don't want to run this.626 if trial_data.observer.killed:627 continue628 # We might have hit the cap on number of examples we should629 # run when calculating the minimal example.630 if not self.should_generate_more():631 break632 prefix = trial_data.buffer633 else:634 max_length = BUFFER_SIZE635 data = self.new_conjecture_data(prefix=prefix, max_length=max_length)636 self.test_function(data)637 self.generate_mutations_from(data)638 # Although the optimisations are logically a distinct phase, we639 # actually normally run them as part of example generation. The640 # reason for this is that we cannot guarantee that optimisation641 # actually exhausts our budget: It might finish running and we642 # discover that actually we still could run a bunch more test cases643 # if we want.644 if (645 self.valid_examples >= max(small_example_cap, optimise_at)646 and not ran_optimisations647 ):648 ran_optimisations = True649 self.optimise_targets()650 def generate_mutations_from(self, data):651 # A thing that is often useful but rarely happens by accident is652 # to generate the same value at multiple different points in the653 # test case.654 #655 # Rather than make this the responsibility of individual strategies656 # we implement a small mutator that just takes parts of the test657 # case with the same label and tries replacing one of them with a658 # copy of the other and tries running it. If we've made a good659 # guess about what to put where, this will run a similar generated660 # test case with more duplication.661 if (662 # An OVERRUN doesn't have enough information about the test663 # case to mutate, so we just skip those.664 data.status >= Status.INVALID665 # This has a tendency to trigger some weird edge cases during666 # generation so we don't let it run until we're done with the667 # health checks.668 and self.health_check_state is None669 ):670 initial_calls = self.call_count671 failed_mutations = 0672 while (673 self.should_generate_more()674 # We implement fairly conservative checks for how long we675 # we should run mutation for, as it's generally not obvious676 # how helpful it is for any given test case.677 and self.call_count <= initial_calls + 5678 and failed_mutations <= 5679 ):680 groups = data.examples.mutator_groups681 if not groups:682 break683 group = self.random.choice(groups)684 ex1, ex2 = [685 data.examples[i] for i in sorted(self.random.sample(group, 2))686 ]687 assert ex1.end <= ex2.start...
erc721_pbt.py
Source:erc721_pbt.py
1import os2import brownie3from brownie.test import strategy4from brownie.exceptions import VirtualMachineError5from hypothesis.strategies import sampled_from6class Options:7 VERIFY_EVENTS = os.getenv("PBT_VERIFY_EVENTS", "no") == "yes"8 VERIFY_RETURN_VALUES = os.getenv("PBT_VERIFY_RETURN_VALUES", "no") == "yes"9 DEBUG = os.getenv("PBT_DEBUG", "no") == "yes"10 STATEFUL_STEP_COUNT = int(os.getenv("PBT_STATEFUL_STEP_COUNT", 10))11 MAX_EXAMPLES = int(os.getenv("PBT_MAX_EXAMPLES", 100))12 SEED = int(os.getenv("PBT_SEED", 0))13 ACCOUNTS = int(os.getenv("PBT_ACCOUNTS", 4))14 TOKENS = int(os.getenv("PBT_TOKENS", 6))15class StateMachine:16 st_owner = strategy("uint256", min_value=0, max_value=Options.ACCOUNTS - 1)17 st_sender = strategy("uint256", min_value=0, max_value=Options.ACCOUNTS - 1)18 st_receiver = strategy(19 "uint256", min_value=0, max_value=Options.ACCOUNTS - 120 )21 st_token = strategy("uint256", min_value=1, max_value=Options.TOKENS)22 st_bool = strategy("bool" )23 def __init__(self, wallets, contract, DEBUG=None):24 self.wallets = wallets25 self.addr2idx = { addr: i for i,addr in enumerate(wallets)}26 self.addr2idx[0] = -127 self.addr2idx[brownie.convert.to_address('0x0000000000000000000000000000000000000000')] = -128 self.tokens = range(1, Options.TOKENS + 1)29 self.contract = contract30 def setup(self):31 # tokenId -> owner address - must match contract's ownerOf(tokenId)32 self.owner = {tokenId: self.contract.address for tokenId in self.tokens}33 # address -> number of tokens - must match contract's balanceOf(address)34 self.balance = {addr: 0 for addr in range(Options.ACCOUNTS)}35 # tokenId -> approved address - must match contract's getApproved(address)36 self.approved = {tokenId: -1 for tokenId in self.tokens}37 # address -> list of approved operators - for each address x in operators[address]38 # isApprovedForAll(address,x) must return true39 self.operators = {addr: set() for addr in range(Options.ACCOUNTS)}40 # Callback for initial setup (contract-dependent)41 self.onSetup()42 if Options.DEBUG:43 print("setup()")44 self.dumpState()45 def teardown(self):46 if Options.DEBUG:47 print("teardown()")48 self.dumpState()49 def canTransfer(self, sender, owner, tokenId):50 return owner == self.owner[tokenId] and (51 sender == owner52 or sender == self.approved[tokenId]53 or sender in self.operators[owner]54 )55 def rule_transferFrom(self, st_owner, st_receiver, st_token, st_sender):56 if Options.DEBUG:57 print(58 "transferFrom(owner {},receiver {},token {} [sender: {}])".format(59 st_owner, st_receiver, st_token, st_sender60 )61 )62 if self.canTransfer(st_sender, st_owner, st_token):63 tx = self.contract.transferFrom(64 self.wallets[st_owner],65 self.wallets[st_receiver],66 st_token,67 {"from": self.wallets[st_sender]},68 )69 self.owner[st_token] = st_receiver70 self.approved[st_token] = -171 self.balance[st_owner] = self.balance[st_owner] - 172 self.balance[st_receiver] = self.balance[st_receiver] + 173 self.verifyOwner(st_token)74 self.verifyApproved(st_token)75 self.verifyBalance(st_owner)76 self.verifyBalance(st_receiver)77 self.verifyEvent(78 tx,79 "Transfer",80 {81 "_from": self.wallets[st_owner],82 "_to": self.wallets[st_receiver],83 "_tokenId": st_token84 },85 )86 else:87 with brownie.reverts():88 self.contract.transferFrom(89 self.wallets[st_owner],90 self.wallets[st_receiver],91 st_token,92 {"from": self.wallets[st_sender]},93 )94 def rule_safeTransferFrom(self, st_owner, st_receiver, st_token, st_sender):95 if Options.DEBUG:96 print(97 "safeTransferFrom({},{},{} [sender: {}])".format(98 st_owner, st_receiver, st_token, st_sender99 )100 )101 if self.canTransfer(st_sender, st_owner, st_token):102 tx = self.contract.safeTransferFrom(103 self.wallets[st_owner],104 self.wallets[st_receiver],105 st_token,106 {"from": self.wallets[st_sender]},107 )108 self.owner[st_token] = st_receiver109 self.approved[st_token] = -1110 self.balance[st_owner] = self.balance[st_owner] - 1111 self.balance[st_receiver] = self.balance[st_receiver] + 1112 self.verifyOwner(st_token)113 self.verifyApproved(st_token)114 self.verifyBalance(st_owner)115 self.verifyBalance(st_receiver)116 self.verifyEvent(117 tx,118 "Transfer",119 {120 "_from": self.wallets[st_owner],121 "_to": self.wallets[st_receiver],122 "_tokenId": st_token123 },124 )125 else:126 with brownie.reverts():127 self.contract.safeTransferFrom(128 self.wallets[st_owner],129 self.wallets[st_receiver],130 st_token,131 {"from": self.wallets[st_sender]},132 )133 def rule_approve(self, st_sender, st_token, st_receiver):134 if Options.DEBUG:135 print(136 "approve({},{}) [sender: {}])".format(137 st_receiver, st_token, st_sender138 )139 )140 if st_receiver != self.owner[st_token] and \141 (self.owner[st_token] == st_sender 142 or st_sender in self.operators[self.owner[st_token]]):143 with normal():144 tx = self.contract.approve(145 self.wallets[st_receiver], st_token,{"from": self.wallets[st_sender]}146 )147 self.approved[st_token] = st_receiver148 self.verifyApproved(st_token)149 self.verifyEvent(150 tx,151 "Approval",152 {153 "_owner": self.wallets[self.owner[st_token]], 154 "_tokenId": st_token, 155 "_approved": self.wallets[st_receiver]156 }157 )158 else:159 with brownie.reverts():160 self.contract.approve(161 self.wallets[st_receiver], st_token,{"from": self.wallets[st_sender]}162 )163 def rule_setApprovalForAll(self, st_sender, st_receiver, st_bool):164 if Options.DEBUG:165 print(166 "setApprovedForAll({}, {}) [sender: {}])".format(167 st_receiver, st_bool, st_sender168 )169 )170 if st_receiver != st_sender:171 with normal():172 tx = self.contract.setApprovalForAll(173 self.wallets[st_receiver], 174 st_bool, {"from":self.wallets[st_sender]})175 if st_bool:176 self.operators[st_sender].add(st_receiver)177 elif st_receiver in self.operators[st_sender]:178 self.operators[st_sender].remove(st_receiver)179 self.verifySetApprovedForAll(st_sender, st_receiver)180 self.verifyEvent(181 tx,182 "ApprovalForAll",183 {184 "_owner": self.wallets[st_sender], 185 "_operator": self.wallets[st_receiver], 186 "_approved": st_bool187 }188 )189 else:190 with brownie.reverts():191 tx = self.contract.setApprovalForAll(192 self.wallets[st_receiver], 193 st_bool, {"from":self.wallets[st_sender]})194 def verifyOwner(self, tokenId):195 self.verifyValue(196 "ownerOf({})".format(tokenId),197 self.owner[tokenId],198 self.addr2idx[self.contract.ownerOf(tokenId)]199 )200 def verifyBalance(self, wIdx):201 self.verifyValue(202 "balanceOf({})".format(wIdx),203 self.balance[wIdx],204 self.contract.balanceOf(self.wallets[wIdx]),205 )206 def verifyApproved(self, token):207 self.verifyValue(208 "getApproved({})".format(token),209 self.approved[token],210 self.addr2idx[self.contract.getApproved(token)]211 )212 def verifySetApprovedForAll(self, sender, receiver):213 self.verifyValue(214 "isApprovedForAll({}, {})".format(sender, receiver),215 receiver in self.operators[sender],216 self.contract.isApprovedForAll(self.wallets[sender], self.wallets[receiver])217 )218 def verifyEvent(self, tx, eventName, data):219 if Options.VERIFY_EVENTS:220 if not eventName in tx.events:221 raise AssertionError(222 "{}: event was not fired".format(eventName)223 )224 ev = tx.events[eventName]225 for k in data:226 if not k in ev:227 raise AssertionError(228 "{}.{}: absent event data".format(eventName, k)229 )230 self.verifyValue("{}.{}".format(eventName, k), data[k], ev[k])231 def verifyReturnValue(self, tx, expected):232 if Options.VERIFY_RETURN_VALUES:233 self.verifyValue("return value", expected, tx.return_value)234 def verifyValue(self, msg, expected, actual):235 if expected != actual:236 self.value_failure = True237 raise AssertionError(238 "{} : expected value {}, actual value was {}".format(239 msg, expected, actual240 )241 )242 def dumpMap(self, desc, map):243 print(desc + " {")244 for k in map:245 print("{} ----> {}".format(k, map[k]))246 print("}")247 def dumpState(self):248 print("= STATE =")249 self.dumpMap("owner", self.owner)250 self.dumpMap("balance", self.balance)251 self.dumpMap("approved", self.approved)252 self.dumpMap("operators", self.operators)253def patch_hypothesis_for_seed_handling(seed):254 import hypothesis255 h_run_state_machine = hypothesis.stateful.run_state_machine_as_test256 def run_state_machine(state_machine_factory, settings=None):257 state_machine_factory._hypothesis_internal_use_seed = seed258 h_run_state_machine(state_machine_factory, settings)259 hypothesis.stateful.run_state_machine_as_test = run_state_machine260def patch_hypothesis_for_fuzz_behavior():261 import hypothesis262 # Replacement for method should_generate_more in hypothesis.internal.conjecture.engine.ConjectureRunner263 def should_generate_more_patch(self):264 # End the generation phase where we would have ended it if no bugs had265 # been found. This reproduces the exit logic in `self.test_function`,266 # but with the important distinction that this clause will move on to267 # the shrinking phase having found one or more bugs, while the other268 # will exit having found zero bugs.269 if self.valid_examples >= self.settings.max_examples or self.call_count >= max(270 self.settings.max_examples * 10, 1000271 ): # pragma: no cover272 return False273 # If we haven't found a bug, keep looking - if we hit any limits on274 # the number of tests to run that will raise an exception and stop275 # the run.276 if not self.interesting_examples:277 return True278 # If we've found a bug and won't report more than one, stop looking.279 elif not self.settings.report_multiple_bugs:280 return False281 assert self.first_bug_found_at <= self.last_bug_found_at <= self.call_count282 # PATCH IS HERE283 return self.valid_examples <= self.settings.max_examples \284 and self.call_count <= 2 * self.settings.max_examples285 hypothesis.internal.conjecture.engine.ConjectureRunner.should_generate_more = should_generate_more_patch 286def patch_brownie_for_assertion_detection():287 from brownie.test.managers.runner import RevertContextManager288 from brownie.exceptions import VirtualMachineError289 f = RevertContextManager.__exit__290 def alt_exit(self, exc_type, exc_value, traceback):291 if exc_type is VirtualMachineError:292 exc_value.__traceback__.tb_next = None293 if exc_value.revert_type != "revert":294 return False295 return f(self, exc_type, exc_value, traceback)296 RevertContextManager.__exit__ = alt_exit297def register_hypothesis_profiles():298 import hypothesis299 from hypothesis import settings, Verbosity, Phase300 derandomize = True301 if Options.SEED != 0:302 patch_hypothesis_for_seed_handling(Options.SEED)303 derandomize = False304 patch_hypothesis_for_fuzz_behavior()305 patch_brownie_for_assertion_detection()306 settings.register_profile(307 "generate",308 stateful_step_count=Options.STATEFUL_STEP_COUNT,309 max_examples=Options.MAX_EXAMPLES,310 phases=[Phase.generate],311 report_multiple_bugs=True,312 derandomize=derandomize,313 print_blob=True,314 )315 settings.register_profile(316 "shrinking",317 stateful_step_count=Options.STATEFUL_STEP_COUNT,318 max_examples=Options.MAX_EXAMPLES,319 phases=[Phase.generate, Phase.shrink],320 report_multiple_bugs=True,321 derandomize=derandomize,322 print_blob=True,323 )324class NoRevertContextManager:325 def __init__(self):326 pass327 def __enter__(self):328 pass329 def __exit__(self, exc_type, exc_value, traceback):330 if exc_type is None:331 return True332 import traceback333 if exc_type is VirtualMachineError:334 exc_value.__traceback__.tb_next = None335 elif exc_type is AssertionError:336 exc_value.__traceback__.tb_next = None337 return False338def normal():...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!