Best Python code snippet using hypothesis
merge_task.py
Source:merge_task.py
1from domain_monitor.models import Zone, Country, Domain, Registration, HostedCountry, ResourceRecord, Search2from domain_monitor.domainsdb_client import get_domains3from domain_monitor import app, db4from datetime import datetime, timedelta5from pprint import pprint6import logging7logger = logging.getLogger("domain_monitor.merge_task")8class InMemoryDimension(object):9 """Dependency for zone_dim and country_dim"""10 11 def __init__(self, model, create_func, key_func):12 self.model = model13 self.create_func = create_func14 self.key_func = key_func15 self.dict = None16 17 def load(self):18 self.dict = {self.key_func(m):m for m in self.model.query.all()}19 20 def ensure_contains(self, key):21 if key not in self.dict and key is not None:22 model = self.create_func(key)23 self.dict[key] = model24 db.session.add(model)25 return self.dict.get(key)26class ChangeSet(object):27 """ dependency use for change_set in merge_all_search"""28 def __init__(self):29 self.added = []30 self.removed = []31def merge_all_searches():32 """Load searches from the database and execute each"""33 34 searches = Search.query.all()35 change_set = ChangeSet()36 for search in searches:37 merge_search(domain_search=search.search_string, is_dead=False, change_set=change_set)38 merge_search(domain_search=search.search_string, is_dead=True, change_set=change_set)39 remove_unseen_domains(change_set=change_set)40 if len(change_set.added) > 0:41 logger.info("Added %d domains", len(change_set.added))42 else:43 logger.info("No Domains Added")44 45 if len(change_set.removed) > 0:46 logger.info("Removed %d domains", len(change_set.removed))47 else:48 logger.info("No Domains Removed")49 50def merge_search(domain_search, is_dead, change_set=None):51 """Seach domains and load into the dabase. 52 Execute search for each known country as well as coutnry omitted. 53 For each result set, if it is truncated, also try searching with all known zones. 54 To find as many as matching domains as possible. """55 countries = {c_model.country_name for c_model in Country.query.all()} 56 57 countries.add(None)58 59 zones = {z_model.zone for z_model in Zone.query.all()}60 for country in countries:61 62 country_result = get_domains(domain_search, country=country, is_dead=is_dead)63 load_domain_results(country_result, change_set)64 if country_result.is_truncated:65 for zone in zones:66 zone_result = get_domains(domain_search, zone, country, is_dead=is_dead)67 load_domain_results(zone_result, change_set)68 if zone_result.is_truncated:69 logger.warn(70 "truncated data search(domain=%r, zone=%r, country=%r, isDead=%r) len = %r", 71 domain_search, zone, country, is_dead , zone_result.match_count72 )73 74def remove_unseen_domains(stale_threshold=timedelta(hours=12), change_set=None):75 """categorize domain to be unseen domain if they have a new create date for merge_all_searches"""76 q = (Registration.query77 .filter(Registration.last_seen_date < datetime.utcnow() - stale_threshold)78 .filter(Registration.removed_date.is_(None)))79 stale = q.all()80 for reg in stale:81 logger.info("Stale registration found: %r", reg)82 reg.removed_date = datetime.utcnow()83 if change_set is not None:84 change_set.removed.append(reg)85 db.session.commit()86 87def load_domain_results(results, change_set=None):88 """call InMemoryDimension function and build zone and country in memory for merge_search"""89 # Build Zone dimension in memory90 zone_dim = InMemoryDimension(91 Zone, 92 lambda key: Zone(zone=key), 93 lambda zone: zone.zone94 )95 96 zone_dim.load()97 98 # Build Country dimension in memory99 country_dim = InMemoryDimension(100 Country,101 lambda key: Country(country_name=key),102 lambda country: country.country_name103 )104 country_dim.load()105 106 # Merge in new domains107 for domain in results.domains:108 zone = zone_dim.ensure_contains(domain.zone)109 country = country_dim.ensure_contains(domain.country)110 111 domain_model = Domain.query.filter(Domain.domain_name == domain.domain).one_or_none()112 if domain_model is None:113 domain_model = Domain(114 domain_name=domain.domain, 115 zone=zone116 )117 db.session.add(domain_model)118 119 logger.debug("%r", domain_model.registrations)120 matching_registrations = [121 reg 122 for reg in domain_model.registrations123 if reg.create_date == domain.create_date 124 and (reg.removed_date is None or domain.is_dead)125 ]126 non_matching_registrations = [127 reg 128 for reg in domain_model.registrations129 if reg.create_date != domain.create_date130 ]131 if len(matching_registrations) > 0:132 logger.debug("Found existing registration")133 registration = matching_registrations[0]134 else:135 logger.info("New registration found: %r", domain.json_object)136 registration = Registration(137 domain=domain_model,138 create_date=domain.create_date,139 is_dead=domain.is_dead,140 added_date=datetime.utcnow()141 )142 db.session.add(registration)143 if change_set is not None:144 change_set.added.append(registration)145 registration.last_seen_date = datetime.utcnow()146 # Record as dead if domain is_dead147 if domain.is_dead:148 if registration.removed_date is None:149 logger.info("Dead registration found: %r", domain.json_object)150 registration.removed_date = datetime.utcnow()151 if change_set is not None:152 change_set.removed.append(registration)153 154 registration.is_dead = domain.is_dead155 registration.update_date = domain.update_date156 157 158 # If non-matching registrations exist, mark them as removed159 for non_matching_registration in non_matching_registrations:160 if non_matching_registration.removed_date is None:161 logger.info("Old registration found: %r", non_matching_registration)162 non_matching_registration.removed_date = datetime.utcnow()163 if change_set is not None:164 change_set.removed.append(non_matching_registration)165 166 if country is not None:167 matching_hcs = [168 hc169 for hc in registration.hosted_countries170 if hc.country == country171 ]172 if len(matching_hcs) > 0:173 logger.debug("Found matching hc")174 else:175 hc = HostedCountry(country=country, registration=registration)176 db.session.add(hc)...
server.py
Source:server.py
1#!/usr/bin/env python32# pedalboard server3import socket4from threading import Thread5from gpiozero import Button6from signal import pause7from time import sleep, time8from radio import Radio9TCP_IP = '0.0.0.0'10TCP_PORT = 3141511ENC = 'UTF-8'12NEW_PRESS_DELAY = 0.3 # in seconds13CONNECTIONS_LIMIT = 514buttons_map = [15 (1, 2),16 (2, 3),17 (3, 4),18 (4, 17),19 (5, 27),20 (6, 22),21 (7, 10),22 (8, 9),23 (9, 11)24]25class BtnsThread(Thread):26 is_dead = True27 buttons = None28 def __init__(self, radio):29 self.is_dead = False30 self.radio = radio31 self.last_press_time = 032 self.is_released = True33 super().__init__()34 def __del__(self):35 if self.is_dead: return36 print('Stopping listening for buttonsâ¦')37 if self.buttons is not None:38 for btn in self.buttons:39 btn[1].when_pressed = None40 btn[1].when_released = None41 del self.buttons42 del self.radio43 del self.last_press_time44 del self.is_released45 del self.is_dead46 def pressed(self, n):47 def f():48 if time() - (self.last_press_time + NEW_PRESS_DELAY) <= 0: return49 print('Pressed button #%d' % n)50 self.last_press_time = time()51 self.is_released = False52 self.radio.trigger('button pressed', n=n)53 return f54 def released(self, n):55 def f():56 if self.is_released: return57 print('Released button #%d' % n)58 self.is_released = True59 self.radio.trigger('button released', n=n)60 return f61 def run(self):62 self.buttons = [(x[0], Button(x[1])) for x in buttons_map]63 for btn in self.buttons:64 btn[1].when_pressed = self.pressed(btn[0])65 btn[1].when_released = self.released(btn[0])66 print('Started buttons listening')67class SocketThread(Thread):68 is_dead = True69 def __init__(self, radio, conn, addr):70 self.is_dead = False71 self.radio = radio72 self.conn = conn73 self.addr = addr74 self.radio.trigger('add connection', connection=self)75 self.radio.on('close connections', self.__del__)76 super().__init__()77 def __del__(self):78 if self.is_dead: return79 self.radio.off('close connections', self.__del__)80 self.radio.off('button pressed', self.send_pressed, soft=True)81 self.radio.off('button released', self.send_released, soft=True)82 self.conn.close()83 self.radio.trigger('remove connection', connection=self)84 print('Connection lost for:', self.addr)85 del self.radio86 del self.conn87 del self.addr88 del self.is_dead89 def send_pressed(self, n):90 try:91 self.conn.send(bytes('button pressed|%d' % n, ENC))92 print('Sent about button pressed to', self.addr)93 except BrokenPipeError:94 self.__del__()95 def send_released(self, n):96 try:97 self.conn.send(bytes('button released|%d' % n, ENC))98 print('Sent about button released to', self.addr)99 except BrokenPipeError:100 self.__del__()101 def run(self):102 print('Address connected:', self.addr)103 self.radio.on('button pressed', self.send_pressed)104 self.radio.on('button released', self.send_released)105class ConnectionsHandler:106 is_dead = True107 def __init__(self, radio):108 self.is_dead = False109 self.connections = []110 self.radio = radio111 self.radio.reply('opened connections count', self.get_connections_count)112 self.radio.on('add connection', self.register_connection)113 self.radio.on('remove connection', self.unregister_connection)114 print('Started connections handling')115 def __del__(self):116 if self.is_dead: return117 self.radio.stopReplying(118 'opened connections count',119 self.get_connections_count120 )121 self.radio.off('add connection', self.register_connection)122 self.radio.off('remove connection', self.unregister_connection)123 for conn in self.connections:124 conn.__del__()125 del conn126 print('Stopped connections handling')127 del self.connections128 del self.radio129 del self.is_dead130 def register_connection(self, connection):131 for conn in self.connections:132 if conn == connection:133 raise Exception('Connection already registered')134 self.connections.append(connection)135 def unregister_connection(self, connection):136 new_connections = []137 for conn in self.connections:138 if conn != connection:139 new_connections.append(conn)140 if len(new_connections) == len(self.connections):141 raise Exception('Connection not found to unregister')142 elif len(new_connections) != len(self.connections) - 1:143 raise Exception('More than one connection to unregister')144 else:145 self.connections = new_connections146 def get_connections_count(self):147 return len(self.connections)148radio = Radio()149btns = BtnsThread(radio)150btns.start()151conn_handler = ConnectionsHandler(radio)152s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)153s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)154s.bind((TCP_IP, TCP_PORT))155s.listen(CONNECTIONS_LIMIT)156try:157 print('Starting listening for socket connectionsâ¦')158 while True:159 conn, addr = s.accept()160 SocketThread(radio, conn, addr).start()161except (KeyboardInterrupt, SystemExit):162 print('Exiting⦠Closing all connectionsâ¦')163 radio.trigger('close connections')164 while True:165 conns_count = radio.request('opened connections count')166 if conns_count == 0: break167 sleep(0.1)168 conn_handler.__del__()169 del conn_handler170 btns.__del__()171 del btns172 radio.__del__()173 del radio174 s.shutdown(socket.SHUT_RDWR)...
battle_field.py
Source:battle_field.py
1class BattleField:2 def fight(self, attacker, enemy):3 if attacker.is_dead or enemy.is_dead:4 raise ValueError("Player is dead!")5 if attacker.__class__.__name__ == "Beginner":6 attacker.health += 407 for c in attacker.card_repository.cards:8 c.damage_points += 309 if enemy.__class__.__name__ == "Beginner":10 enemy.health += 4011 for c in enemy.card_repository.cards:12 c.damage_points += 3013 attacker.health += sum([c.health_points for c in attacker.card_repository.cards])14 enemy.health += sum([c.health_points for c in enemy.card_repository.cards])15 for c in attacker.card_repository.cards:16 if enemy.is_dead or attacker.is_dead:17 return18 enemy.take_damage(c.damage_points)19 for c in enemy.card_repository.cards:20 if attacker.is_dead or enemy.is_dead:21 return...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!