Best Python code snippet using hypothesis
loader.py
Source:loader.py
1import re2import json3import logging4from django.utils import timezone5from dateutil.parser import parse as dt_parse6from names_translator.name_utils import parse_fullname7from abstract.loaders import FileLoader8from abstract.exc import ParsingError9logging.basicConfig(level=logging.WARNING)10logger = logging.getLogger("importer:vkks")11class VKKSLoader(FileLoader):12 filetype = "jsonlines"13 @property14 def model(self):15 from .models import VKKSModel16 return VKKSModel17 def process_e_declaration(self, record):18 # ÑодинниÑ
звâÑзкÑв ÑÑÐ´Ð´Ñ Ð·Ð° 2013â2017 Ñоки19 year_from = record["values"].get("11")20 year_to = record["values"].get("12")21 title = record.get("date_and_type") or ""22 # TODO: Sane default?23 declaration_type = ""24 parsed_title = re.match(r"(.*)\s(\d{4})[\-â](\d{4})", title)25 if parsed_title:26 declaration_type = re.sub(r"\sза$", "", parsed_title.group(1))27 year_from = year_from or parsed_title.group(2)28 year_to = year_to or parsed_title.group(3)29 else:30 declaration_type = title31 if not year_from or not year_to:32 logger.warning("Cannot parse title '{}/{}'".format(year_from, year_to))33 l, f, p, _ = parse_fullname(record["fields"]["name"])34 submit_date = dt_parse(record["submitDate"]).astimezone(timezone.get_current_timezone())35 res = {36 "source": "electronic",37 "ID": record["ID"],38 "intro": {39 "declaration_year_from": year_from,40 "declaration_year_to": year_to,41 "declaration_type": declaration_type,42 },43 "general": {44 "post": {45 "office": "{}, {}".format(46 record["values"].get("114", {}).get("label", ""),47 record["values"]["104"],48 ).strip(" ,"),49 "office_id": record["values"].get("114", {}).get("value"),50 "post": record["values"]["105"],51 },52 "family": [],53 "family_conflicts": [],54 "last_name": l,55 "name": f,56 "patronymic": p,57 "family_comment": record["values"]["292"],58 "has_information": record["values"]["211"],59 "family_conflicts_comment": record["values"]["392"],60 },61 "declaration": {62 "date_day": str(submit_date.day),63 "date_month": str(submit_date.month),64 "date_year": str(submit_date.year),65 "date_time": str(submit_date.time()),66 },67 }68 for pos in range(0, int(record["values"]["220"]) + 1):69 # TODO: process cases where some records are reusing the name above.70 if record["values"]["220_{}-221".format(pos)]:71 fam = {}72 l, f, p, _ = parse_fullname(record["values"]["220_{}-221".format(pos)])73 fam["last_name"] = l74 fam["name"] = f75 fam["patronymic"] = p76 fam["relation"] = record["values"]["220_{}-222".format(pos)]77 fam["career"] = []78 # Trying to parse weird raw texts records for the career of the family79 # member80 # The general idea of the records in the four columns is that they 81 # are visually separated into rows, tho not always the separation looks82 # ideal even for the human eye. So, we are using plenty of heuristics 83 # below to find rows in every columns and dance around some of well known84 # corner cases (for example, no end date usually means that person still holds85 # an office)86 separator = "\n"87 if "\n" not in record["values"]["220_{}-224".format(pos)]:88 if "__" in record["values"]["220_{}-224".format(pos)]:89 separator = "__"90 else:91 separator = ","92 years_from = list(93 filter(None, record["values"]["220_{}-225".format(pos)].split("\n"))94 )95 years_to = list(96 filter(None, record["values"]["220_{}-226".format(pos)].split("\n"))97 )98 for x in range(4):99 offices = list(100 map(101 lambda x: x.strip(" \n,_"),102 filter(103 None,104 record["values"]["220_{}-223".format(pos)].split(105 separator * (4 - x)106 ),107 ),108 )109 )110 positions = list(111 map(112 lambda x: x.strip(" \n,_"),113 filter(114 None,115 record["values"]["220_{}-224".format(pos)].split(116 separator * (4 - x)117 ),118 ),119 )120 )121 if len(positions) == len(years_from):122 break123 if len(positions) != len(years_from):124 raise ParsingError(125 "Number of positions doesn't correspond to number of 'from' dates for {}".format(126 res["ID"]127 )128 )129 if (len(years_from) - len(years_to)) > 1:130 raise ParsingError(131 "Number of 'from' dates doesn't correspond to number of 'to' dates for {}".format(132 res["ID"]133 )134 )135 if len(positions) != len(offices) and len(offices) > 1:136 raise ParsingError(137 "Number of 'positions' doesn't correspond to number of 'offices' for {}".format(138 res["ID"]139 )140 )141 if len(years_to) - len(years_from) > 1:142 raise ParsingError(143 "Number of 'to' dates has much more lines than number of 'from' dates for {}".format(144 res["ID"]145 )146 )147 _parsing_quality = []148 if len(years_from) == len(years_to) == len(offices):149 _parsing_quality.append("ideal")150 else:151 # Here we analyzing corner cases and applying some fixes to know problems152 # while carefully recording what we are doing and why for further visual153 # examination154 if len(years_from) - len(years_to) == 1:155 _parsing_quality.append("no_end_date")156 years_to.append("Ðо ÑепеÑÑÑнÑй ÑаÑ")157 if len(positions) != len(offices) and len(offices) == 1:158 _parsing_quality.append("one_office")159 offices = [offices[0]] * len(positions)160 if len(offices) == 0:161 _parsing_quality.append("no_public_office")162 offices = [""] * len(positions)163 if len(years_to) - len(years_from) == 1:164 _parsing_quality.append("last_date_has_two_lines")165 years_to = years_to[:-2] + [years_to[-2] + " " + years_to[-1]]166 if not (len(years_from) == len(years_to) == len(offices)):167 raise ParsingError("Something went very wrong when normalizing record {}".format(res["ID"]))168 for position, office, year_from, year_to in zip(positions, offices, years_from, years_to):169 fam["_parsing_quality"] = _parsing_quality170 fam["career"].append({171 "position": position,172 "workplace": office,173 "from": year_from,174 "to": year_to175 })176 res["general"]["family"].append(fam)177 for pos in range(0, int(record["values"]["300"]) + 1):178 if record["values"]["300_{}-311".format(pos)]:179 fam = {}180 l, f, p, _ = parse_fullname(record["values"]["300_{}-311".format(pos)])181 fam["last_name"] = l182 fam["name"] = f183 fam["patronymic"] = p184 if record["values"]["300_{}-321".format(pos)]:185 fam["coliving"] = "СпÑлÑно не пÑоживаÑмо"186 elif record["values"]["300_{}-322".format(pos)]:187 fam["coliving"] = "СпÑлÑно пÑоживаÑмо"188 elif record["values"]["300_{}-323".format(pos)]:189 fam["coliving"] = "ТимÑаÑово ÑпÑлÑно не пÑоживаÑмо"190 elif record["values"]["300_{}-324".format(pos)]:191 fam["coliving"] = "ТимÑаÑово ÑпÑлÑно пÑоживаÑмо"192 if record["values"]["300_{}-331".format(pos)]:193 fam["cohabiting"] = "СпÑлÑним побÑÑом не повâÑзанÑ"194 elif record["values"]["300_{}-332".format(pos)]:195 fam["cohabiting"] = "ÐовâÑÐ·Ð°Ð½Ñ ÑпÑлÑним побÑÑом"196 elif record["values"]["300_{}-333".format(pos)]:197 fam["cohabiting"] = "ТимÑаÑово не повâÑÐ·Ð°Ð½Ñ ÑпÑлÑним побÑÑом"198 elif record["values"]["300_{}-334".format(pos)]:199 fam["cohabiting"] = "ТимÑаÑово повâÑÐ·Ð°Ð½Ñ ÑпÑлÑним побÑÑом"200 if record["values"]["300_{}-341".format(pos)]:201 fam["mutual_liabilities"] = "ÐÑнÑÑÑÑ Ð²Ð·Ð°ÑÐ¼Ð½Ñ Ð¿Ñава Ñа/Ñи обовâÑзки"202 elif record["values"]["300_{}-342".format(pos)]:203 fam["mutual_liabilities"] = "ÐÑдÑÑÑÐ½Ñ Ð²Ð·Ð°ÑÐ¼Ð½Ñ Ð¿Ñава Ñа обовâÑзки"204 res["general"]["family_conflicts"].append(fam)205 return res206 def process_paper_declaration(self, record):207 if record:208 rec = record[0]["answer"]209 rec["source"] = "paper"210 if "has_information" in rec["general"]:211 rec["general"]["has_information"] = rec["general"]["has_information"] == "1"212 else:213 rec["general"]["has_information"] = bool(rec["general"]["family"])214 rec["ID"] = record[0]["task"]["id"]215 rec["url"] = record[0]["task"]["data"]["file"]216 return rec217 def preprocess(self, record, options):218 if options["source"] == "electronic":219 return self.process_e_declaration(record)220 if options["source"] == "paper":221 return self.process_paper_declaration(record)222 return record223 def inject_params(self, parser):224 super().inject_params(parser)225 parser.add_argument(226 "--source",227 choices=("paper", "electronic"),228 required=True,229 help="Source of the data (affects the conversion/parsing)",230 )231 def get_dedup_fields(self):...
arnetwork.py
Source:arnetwork.py
1# Copyright (c) 2011 Bastian Venthur2#3# Permission is hereby granted, free of charge, to any person obtaining a copy4# of this software and associated documentation files (the "Software"), to deal5# in the Software without restriction, including without limitation the rights6# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell7# copies of the Software, and to permit persons to whom the Software is8# furnished to do so, subject to the following conditions:9#10# The above copyright notice and this permission notice shall be included in11# all copies or substantial portions of the Software.12#13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR14# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,15# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE16# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER17# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,18# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN19# THE SOFTWARE.20import logging21"""22This module provides access to the data provided by the AR.Drone.23"""24import threading25import select26import socket27import multiprocessing28import libardrone29class ARDroneNetworkProcess(threading.Thread):30 """ARDrone Network Process.31 This process collects data from the video and navdata port, converts the32 data and sends it to the IPCThread.33 """34 def __init__(self, com_pipe, is_ar_drone_2, drone):35 threading.Thread.__init__(self)36 self._drone = drone37 self.com_pipe = com_pipe38 self.is_ar_drone_2 = is_ar_drone_239 40 def run(self):41 def _connect():42 logging.warn('Connexion vers AR Drone en cours ...')43 44 nav_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)45 nav_socket.setblocking(0)46 nav_socket.bind(('', libardrone.ARDRONE_NAVDATA_PORT))47 nav_socket.sendto("\x01\x00\x00\x00", ('192.168.1.1', libardrone.ARDRONE_NAVDATA_PORT))48 control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)49 control_socket.connect(('192.168.1.1', libardrone.ARDRONE_CONTROL_PORT))50 control_socket.setblocking(0)51 logging.warn('Connection etablie')52 return nav_socket, control_socket53 def _disconnect(nav_socket, control_socket):54 logging.warn('Deconnection AR Drone')55 nav_socket.close()56 control_socket.close()57 nav_socket, control_socket = _connect()58 stopping = False59 connection_lost = 160 reconnection_needed = False61 while not stopping:62 if reconnection_needed:63 _disconnect(nav_socket, control_socket)64 nav_socket, control_socket = _connect()65 reconnection_needed = False66 inputready, outputready, exceptready = select.select([nav_socket, self.com_pipe, control_socket], [], [], 1.)67 if len(inputready) == 0:68 connection_lost += 169 reconnection_needed = True70 for i in inputready:71 if i == nav_socket:72 while 1:73 try:74 data = nav_socket.recv(500)75 except IOError:76 # we consumed every packet from the socket and77 # continue with the last one78 break79 navdata, has_information = libardrone.decode_navdata(data)80 if (has_information):81 self._drone.set_navdata(navdata)82 elif i == self.com_pipe:83 _ = self.com_pipe.recv()84 stopping = True85 break86 elif i == control_socket:87 reconnection_needed = False88 while not reconnection_needed:89 try:90 data = control_socket.recv(65536)91 if len(data) == 0:92 logging.warning('Received an empty packet on control socket')93 reconnection_needed = True94 else:95 logging.warning("Control Socket says : %s", data)96 except IOError:97 break...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!