Best Python code snippet using localstack_python
test_verifier.py
Source:test_verifier.py
...26 "premium": Limit(10, 1, 2),27 },28 }29)30def _create_key(key, tier):31 u = User.objects.create(username=key)32 u.profile.api_key = key33 u.profile.api_tier = tier34 u.profile.save()35@pytest.fixture36def cb():37 # ensure we have a fresh cache backend each time38 c = CacheBackend()39 c.cache.clear()40 return c41def test_get_tokens_and_timestamp_initial(cb):42 assert cb.get_tokens_and_timestamp("key", "zone") == (0, None)43def test_token_set_and_retrieve(cb):44 with freeze_time("2017-04-17"):45 cb.set_token_count("key", "zone", 100)46 tokens, timestamp = cb.get_tokens_and_timestamp("key", "zone")47 assert tokens == 10048 assert int(timestamp) == 1492387200 # frozen time49def test_key_and_zone_independence(cb):50 cb.set_token_count("key", "zone", 100)51 assert cb.get_tokens_and_timestamp("key2", "zone") == (0, None)52 assert cb.get_tokens_and_timestamp("key", "zone2") == (0, None)53def test_get_and_inc_quota(cb):54 assert cb.get_and_inc_quota_value("key", "zone", "20170411") == 155 assert cb.get_and_inc_quota_value("key", "zone", "20170411") == 256 assert cb.get_and_inc_quota_value("key", "zone", "20170412") == 157def test_get_and_inc_quota_key_and_zone_independence(cb):58 assert cb.get_and_inc_quota_value("key", "zone", "20170411") == 159 assert cb.get_and_inc_quota_value("key2", "zone", "20170411") == 160 assert cb.get_and_inc_quota_value("key", "zone2", "20170411") == 161@pytest.mark.django_db62def test_verifier_bad_key():63 pytest.raises(VerificationError, verify, "badkey", "bronze")64@pytest.mark.django_db65def test_verifier_inactive_key():66 _create_key("newkey", "inactive")67 pytest.raises(VerificationError, verify, "newkey", "bronze")68@pytest.mark.django_db69def test_verifier_suspended_key():70 _create_key("newkey", "suspended")71 pytest.raises(VerificationError, verify, "newkey", "bronze")72@pytest.mark.django_db73def test_verifier_zone_access():74 _create_key("goldkey", "gold")75 _create_key("bronzekey", "bronze")76 # bronze can get default77 assert verify("bronzekey", "default") is True78 # gold has access, bronze doesn't79 assert verify("goldkey", "premium") is True80 pytest.raises(VerificationError, verify, "bronzekey", "premium")81@pytest.mark.django_db82def test_verifier_rate_limit():83 _create_key("vrl", "bronze")84 with freeze_time() as frozen_dt:85 # to start - we should have full capacity for a burst of 1086 for x in range(10):87 verify("vrl", "default")88 # this next one should raise an exception89 pytest.raises(RateLimitError, verify, "vrl", "default")90 # let's go forward 1sec, this will let the bucket get 2 more tokens91 frozen_dt.tick()92 # two more, then limited93 verify("vrl", "default")94 verify("vrl", "default")95 pytest.raises(RateLimitError, verify, "vrl", "default")96@pytest.mark.django_db97def test_verifier_rate_limit_full_refill():98 _create_key("vrlfr", "gold")99 with freeze_time() as frozen_dt:100 # let's use the premium zone now - 1req/sec. & burst of 2101 verify("vrlfr", "premium")102 verify("vrlfr", "premium")103 pytest.raises(RateLimitError, verify, "vrlfr", "premium")104 # in 5 seconds - ensure we haven't let capacity surpass burst rate105 frozen_dt.tick(delta=datetime.timedelta(seconds=5))106 verify("vrlfr", "premium")107 verify("vrlfr", "premium")108 pytest.raises(RateLimitError, verify, "vrlfr", "premium")109@pytest.mark.django_db110def test_verifier_rate_limit_key_dependent():111 # ensure that the rate limit is unique per-key112 _create_key("b1", "gold")113 _create_key("b2", "gold")114 # each key is able to get both of its requests in, no waiting115 verify("b1", "premium")116 verify("b1", "premium")117 verify("b2", "premium")118 verify("b2", "premium")119 pytest.raises(RateLimitError, verify, "b1", "premium")120 pytest.raises(RateLimitError, verify, "b2", "premium")121@pytest.mark.django_db122def test_verifier_rate_limit_zone_dependent():123 # ensure that the rate limit is unique per-zone124 _create_key("zonedep", "gold")125 # key is able to get both of its requests in, no waiting126 verify("zonedep", "premium")127 verify("zonedep", "premium")128 # and can hit another zone no problem129 verify("zonedep", "default")130 # but premium is still exhausted131 pytest.raises(RateLimitError, verify, "zonedep", "premium")132@pytest.mark.django_db133def test_verifier_quota_day():134 _create_key("vqd", "silver")135 with freeze_time("2017-04-17") as frozen_dt:136 # silver can hit premium only 10x/day (burst is also 10)137 for x in range(10):138 verify("vqd", "premium")139 # after 1 second, should have another token140 frozen_dt.tick()141 # but still no good- we've hit our daily limit142 pytest.raises(QuotaError, verify, "vqd", "premium")143 # let's pretend a day has passed, we can call again!144 frozen_dt.tick(delta=datetime.timedelta(days=1))145 for x in range(10):146 verify("vqd", "premium")147@pytest.mark.django_db148def test_verifier_quota_key_dependent():149 _create_key("vqk1", "gold")150 _create_key("vqk2", "gold")151 with freeze_time("2017-04-17") as frozen_dt:152 # 1 req/sec from vqk1 and vqk1153 for x in range(10):154 verify("vqk1", "premium")155 verify("vqk2", "premium")156 frozen_dt.tick()157 # 11th in either should be a problem - day total is exhausted158 pytest.raises(QuotaError, verify, "vqk1", "premium")159 pytest.raises(QuotaError, verify, "vqk2", "premium")160@pytest.mark.django_db161def test_verifier_quota_zone_dependent():162 _create_key("qzd", "silver")163 with freeze_time("2017-04-17") as frozen_dt:164 # should be able to do 10 in premium & secret without issue165 for x in range(10):166 verify("qzd", "premium")167 verify("qzd", "default")168 frozen_dt.tick()169 # 11th in either should be a problem170 pytest.raises(QuotaError, verify, "qzd", "premium")...
database.py
Source:database.py
...5 def __init__(self, ip, port):6 self.database = dict()7 self.neighbors = dict()8 self.peers = dict()9 self.key = self._create_key(ip, port)10 self.database[self.key] = dict()11 self.db_lock = threading.Lock()12 def register_peer(self, peer_dict):13 with self.db_lock:14 username = peer_dict["username"]15 is_new_peer = True16 if username in self.peers and self.peers[username]:17 self.peers[username].cancel()18 is_new_peer = False19 if peer_dict["ipv4"] == "0.0.0.0" and peer_dict["port"] == 0:20 del self.peers[username]21 del self.database[self.key][username]22 return True23 self.database[self.key][username] = {24 "username": username,25 "ipv4": peer_dict["ipv4"],26 "port": peer_dict["port"]27 }28 self.peers[username] = self.daemonize_with_timer(30, self.remove_peer, args=[username])29 return is_new_peer30 def update_database(self, node_ip, node_port, db_dict):31 with self.db_lock:32 key = self._create_key(node_ip, node_port)33 if key == self.key:34 return []35 self.database[key] = dict()36 new_neighbors = []37 for neighbor in db_dict.keys():38 if neighbor == self.key:39 continue40 addr = self._get_addr_from_key(neighbor)41 if addr not in self.neighbors:42 new_neighbors.append(addr)43 if (node_ip, node_port) in self.neighbors and self.neighbors[(node_ip, node_port)]:44 self.neighbors[(node_ip, node_port)].cancel()45 self.neighbors[(node_ip, node_port)] = self.daemonize_with_timer(46 12, self.remove_neighbor, kwargs=dict(ip=node_ip, port=node_port)47 )48 if key in db_dict:49 for peer in db_dict[key].values():50 self.database[key][peer["username"]] = peer51 return new_neighbors52 def list(self):53 with self.db_lock:54 ret = dict()55 index = 056 for node in self.database.values():57 for peer in node.values():58 ret[str(index)] = peer59 index += 160 return ret61 def content(self):62 with self.db_lock:63 ret = dict()64 for key, peer_set in self.database.items():65 ret[key] = dict()66 index = 067 for peer in peer_set.values():68 ret[key][str(index)] = peer69 index += 170 return ret71 def remove_neighbor(self, ip, port):72 with self.db_lock:73 if (ip, port) in self.neighbors and self.neighbors[(ip, port)]:74 self.neighbors[(ip, port)].cancel()75 del self.neighbors[(ip, port)]76 key = self._create_key(ip, port)77 if key in self.database:78 del self.database[key]79 def print_neighbors(self):80 with self.db_lock:81 print("Neighbors:")82 for n in self.neighbors:83 print("\t" + self._create_key(n[0], n[1]))84 def print_database(self):85 with self.db_lock:86 print("Database:")87 for node, peers in self.database.items():88 print("\t" + node + ":")89 for username, addr in peers.items():90 print("\t\t" + username + ": " + self._create_key(addr["ipv4"], addr["port"]))91 def remove_peer(self, username):92 with self.db_lock:93 del self.peers[username]94 del self.database[self.key][username]95 def remove_all(self):96 with self.db_lock:97 for key in self.neighbors.keys():98 if self.neighbors[key]:99 self.neighbors[key].cancel()100 del self.database[self._create_key(key[0], key[1])]101 self.neighbors = dict()102 def is_registered(self, ip, port):103 with self.db_lock:104 for peer in self.database[self.key].values():105 if peer["ipv4"] == ip and peer["port"] == port:106 return True107 return False108 @staticmethod109 def _create_key(ip, port):110 return ",".join([ip, str(port)])111 @staticmethod112 def _get_addr_from_key(key):113 addr = key.split(",", 2)114 return addr[0], int(addr[1])115 @staticmethod116 def daemonize_with_timer(seconds, target, args=None, kwargs=None):117 t = threading.Timer(seconds, target, args=args, kwargs=kwargs)118 t.daemon = True119 t.start()...
global_lock.py
Source:global_lock.py
...7 self._lock_name = lock_name8 self._run_once = run_once9 self._lock_format = "{}.GlobalLock.{}.{}"10 self._check_sum = check_sum or 'global'11 def _create_key(self, value):12 return self._lock_format.format(g.ADDON_NAME, self._lock_name, value)13 def _run(self):14 while not g.abort_requested() and self._running():15 if g.wait_for_abort(.100):16 break17 g.set_runtime_setting(self._create_key("Running"), True)18 self._check_ran_once_already()19 def _running(self):20 return g.get_bool_runtime_setting(self._create_key("Running"))21 def _check_ran_once_already(self):22 if g.get_bool_runtime_setting(self._create_key("RunOnce")) and \23 g.get_runtime_setting(self._create_key("CheckSum")) == self._check_sum:24 g.clear_runtime_setting(self._create_key("Running"))25 raise RanOnceAlready("Lock name: {}, Checksum: {}".format(self._lock_name, self._check_sum))26 def __enter__(self):27 self._run()28 return self29 def __exit__(self, exc_type, exc_val, exc_tb):30 if self._run_once:31 g.set_runtime_setting(self._create_key("RunOnce"), True)32 g.set_runtime_setting(self._create_key("CheckSum"), self._check_sum)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!