Best Python code snippet using hypothesis
cache.py
Source:cache.py
...29 the smallest score, then adds the new key to the map.30 A key has the following lifecycle:31 1. key is written for the first time, the key is given the score32 self.new_entry(key, value)33 2. whenever an existing key is read or written, self.on_access(key, value,34 score) is called. This returns a new score for the key.35 3. When a key is evicted, self.on_evict(key, value, score) is called.36 The cache will be in a valid state in all of these cases.37 Implementations are expected to implement new_entry and optionally38 on_access and on_evict to implement a specific scoring strategy.39 """40 __slots__ = ('keys_to_indices', 'data', 'max_size')41 def __init__(self, max_size):42 self.max_size = max_size43 # Implementation: We store a binary heap of Entry objects in self.data,44 # with the heap property requirnig that a parent's score is <= that of45 # its children. keys_to_index then maps keys to their index in the46 # heap. We keep these two in sync automatically - the heap is never47 # reordered without updating the index.48 self.keys_to_indices = {}49 self.data = []50 def __len__(self):51 assert len(self.keys_to_indices) == len(self.data)52 return len(self.data)53 def __getitem__(self, key):54 i = self.keys_to_indices[key]55 result = self.data[i]56 self.on_access(result.key, result.value, result.score)57 self.__balance(i)58 return result.value59 def __setitem__(self, key, value):60 if self.max_size == 0:61 return62 evicted = None63 try:64 i = self.keys_to_indices[key]65 except KeyError:66 entry = Entry(key, value, self.new_entry(key, value))67 if len(self.data) >= self.max_size:68 evicted = self.data[0]69 del self.keys_to_indices[evicted.key]70 i = 071 self.data[0] = entry72 else:73 i = len(self.data)74 self.data.append(entry)75 self.keys_to_indices[key] = i76 else:77 entry = self.data[i]78 assert entry.key == key79 entry.value = value80 entry.score = self.on_access(entry.key, entry.value, entry.score)81 self.__balance(i)82 if evicted is not None:83 if self.data[0] is not entry:84 assert evicted.score <= self.data[0].score85 self.on_evict(evicted.key, evicted.value, evicted.score)86 def clear(self):87 del self.data[:]88 self.keys_to_indices.clear()89 def __repr__(self):90 return '{%s}' % (', '.join(91 '%r: %r' % (e.key, e.value) for e in self.data),)92 def new_entry(self, key, value):93 """Called when a key is written that does not currently appear in the94 map.95 Returns the score to associate with the key.96 """97 raise NotImplementedError()98 def on_access(self, key, value, score):99 """Called every time a key that is already in the map is read or100 written.101 Returns the new score for the key.102 """103 return score104 def on_evict(self, key, value, score):105 """Called after a key has been evicted, with the score it had had at106 the point of eviction."""107 pass108 def check_valid(self):109 """Debugging method for use in tests.110 Asserts that all of the cache's invariants hold. When everything111 is working correctly this should be an expensive no-op.112 """113 for i, e in enumerate(self.data):114 assert self.keys_to_indices[e.key] == i115 for j in [i * 2 + 1, i * 2 + 2]:116 if j < len(self.data):117 assert e.score <= self.data[j].score, self.data118 def __swap(self, i, j):119 assert i < j120 assert self.data[j].score < self.data[i].score121 self.data[i], self.data[j] = self.data[j], self.data[i]122 self.keys_to_indices[self.data[i].key] = i123 self.keys_to_indices[self.data[j].key] = j124 def __balance(self, i):125 """When we have made a modification to the heap such that means that126 the heap property has been violated locally around i but previously127 held for all other indexes (and no other values have been modified),128 this fixes the heap so that the heap property holds everywhere."""129 while i > 0:130 parent = (i - 1) // 2131 if self.__out_of_order(parent, i):132 self.__swap(parent, i)133 i = parent134 else:135 break136 while True:137 children = [138 j for j in (2 * i + 1, 2 * i + 2)139 if j < len(self.data)140 ]141 if len(children) == 2:142 children.sort(key=lambda j: self.data[j].score)143 for j in children:144 if self.__out_of_order(i, j):145 self.__swap(i, j)146 i = j147 break148 else:149 break150 def __out_of_order(self, i, j):151 """Returns True if the indices i, j are in the wrong order.152 i must be the parent of j.153 """154 assert i == (j - 1) // 2155 return self.data[j].score < self.data[i].score156class LRUReusedCache(GenericCache):157 """The only concrete implementation of GenericCache we use outside of tests158 currently.159 Adopts a modified least-frequently used eviction policy: It evicts the key160 that has been used least recently, but it will always preferentially evict161 keys that have only ever been accessed once. Among keys that have been162 accessed more than once, it ignores the number of accesses.163 This retains most of the benefits of an LRU cache, but adds an element of164 scan-resistance to the process: If we end up scanning through a large165 number of keys without reusing them, this does not evict the existing166 entries in preference for the new ones.167 """168 __slots__ = ('__tick',)169 def __init__(self, max_size, ):170 super(LRUReusedCache, self).__init__(max_size)171 self.__tick = 0172 def tick(self):173 self.__tick += 1174 return self.__tick175 def new_entry(self, key, value):176 return [1, self.tick()]177 def on_access(self, key, value, score):178 score[0] = 2179 score[1] = self.tick()...
assertion.py
Source:assertion.py
...33 if name == opname:34 return create(op)35 if '_' in opname and name == '_'.join(opname.split('_')[1:]):36 return create(op)37 def _on_access(self):38 # Define on_access operator function39 def on_access(subject):40 try:41 self._op.on_access(subject)42 finally:43 return True44 # Add assertion function45 self._test._engine.add_assertion(on_access)46 # Flag proxy state as accessed operator47 self._accessed = True48 def is_on_access(self):49 # Trigger operator on_access operator event method, if available50 return hasattr(self._op, 'on_access') and not self._accessed51 def __getattr__(self, name):52 """53 Overloads attribute accessor in order to load the assertion54 operator dynamically.55 """56 # Match operator aliases for chaining57 if self._match_alias(name):58 self._test._engine.add_keyword(name)59 return self60 # Trigger operator on_access operator event method, if available61 if self.is_on_access():62 self._on_access()63 # Match suboperator, if needed64 suboperator = self._match_suboperator(name)65 if suboperator:66 # Register keyword for suboperator67 self._test._engine.add_keyword(name)68 # Trigger suboperator on_access event method, if available69 if suboperator.is_on_access():70 suboperator._on_access()71 # Return suboperator proxy72 return suboperator73 # Delegate access to global assertion instance74 return getattr(self._test, name)75 def __ror__(self, value):76 """77 Overloads ``|`` operator.78 """79 return self._fn(value)._trigger()80 def __gt__(self, value):81 """82 Overloads ``>`` operator.83 """84 return self.__ror_(value)
test__dictmodel.py
Source:test__dictmodel.py
1import fire2def test__basic_model__init_with_params():3 from dictmodel import DictModel, DictField4 class MyModel(DictModel):5 int_field: int = DictField()6 str_field: str = DictField()7 dict_field: dict = DictField()8 9 mm = MyModel(10 int_field=3,11 str_field="sdf",12 dict_field={}13 )14 assert mm.int_field == 315 assert mm.str_field == "sdf"16 assert mm.dict_field == {}17def test__basic_model__init_no_params():18 from dictmodel import DictModel, DictField19 class MyModel(DictModel):20 int_field: int = DictField()21 str_field: str = DictField()22 dict_field: dict = DictField()23 24 mm = MyModel()25 assert mm.int_field is None26 assert mm.str_field is None27 assert mm.dict_field is None28def test__basic_model__init_on_access_flag():29 """ Example:30 on_access flag is used to autocreate value for field using its type. 31 For custom constructors use default param with provided factory method32 """33 from dictmodel import DictModel, DictField34 class MyModel(DictModel):35 int_field: int = DictField(on_access=True)36 str_field: str = DictField(on_access=True)37 dict_field: dict = DictField(on_access=True)38 39 mm = MyModel()40 assert not mm.to_struct()41 mm.int_field += 142 assert mm.int_field == 143 assert mm.str_field == ""44 assert mm.dict_field == {}45 assert mm.to_struct()46def test__basic_model__init_default():47 """ Example:48 on_access flag is used to autocreate value for field using its type. 49 For custom constructors use default param with provided factory method50 """51 from dictmodel import DictModel, DictField52 from uuid import uuid453 class MyModel(DictModel):54 int_field: int = DictField(default=lambda: 3)55 str_field: str = DictField(default=lambda: uuid4)56 dict_field: dict = DictField(default=dict)57 58 mm = MyModel()59 d1 = mm.to_struct()60 assert mm.int_field == 361 assert mm.str_field62 assert mm.dict_field == {}63def test__to_yaml_str():64 from dictmodel import DictModel, DictField65 class MyModel(DictModel):66 f1 = DictField()67 68 model = MyModel(f1=dict(69 name='somename',70 items=[1,2,3]))71 yaml_str = model.to_yaml_str()72 lines = yaml_str.split('\n')73 print(model.to_yaml_str())74 assert any('items: [1, 2, 3]' in line for line in lines)75 assert any('name: somename' in line for line in lines)76if __name__ == "__main__":77 # test_typing()78 # test__basic_model()79 # test__dict_types()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!