Best Python code snippet using autotest_python
preprocess_convlab_multiwoz.py
Source:preprocess_convlab_multiwoz.py
1#!/usr/bin/env python2import re3import sys4import json5import shutil6import types7import logging8import subprocess9import zipfile10from io import BytesIO11from itertools import chain12from tqdm import tqdm13from collections import OrderedDict, defaultdict, Counter14import requests15import os16from copy import deepcopy17import inspect18sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))19from utils import setup_logging # noqa: E40220setup_logging()21logger = logging.getLogger()22DATASETS_PATH = os.path.join(os.path.expanduser(os.environ.get('DATASETS_PATH', '~/datasets')), 'augpt')23MW_DOMAINS = ['restaurant', 'hotel', 'attraction', 'train', 'taxi', 'hospital', 'police']24DEFAULT_IGNORE_VALUES = ['not mentioned', 'dont care', 'don\'t care', 'dontcare', 'do n\'t care', 'none']25class Lexicalizer:26 def __init__(self, zipf):27 self.path = zipf.filename28 placeholder_re = re.compile(r'\[(\s*[\w_\s]+)\s*\]')29 number_re = re.compile(r'.*(\d+|one|two|three|four|five|six|seven|eight|nine|ten|eleven|twelve)\s$')30 time_re = re.compile(r'((?:\d{1,2}[:]\d{2,3})|(?:\d{1,2} (?:am|pm)))', re.IGNORECASE)31 @staticmethod32 def ends_with_number(s):33 return bool(Lexicalizer.number_re.match(s))34 @staticmethod35 def extend_database_results(database_results, belief):36 # Augment database results from the belief state37 database_results = OrderedDict(database_results)38 if belief is not None:39 for i, (domain, (num_results, results)) in enumerate(database_results.items()):40 if domain not in belief:41 continue42 elif "ruk" in belief[domain].keys():43 belief[domain].pop("ruk")44 elif "topic" in belief[domain].keys():45 belief[domain].pop("topic")46 if num_results == 0:47 database_results[domain] = (1, [belief[domain]])48 else:49 new_results = []50 for r in results:51 r = dict(**r)52 for k, val in belief[domain].items():53 if k not in r:54 r[k] = val55 new_results.append(r)56 database_results[domain] = (num_results, new_results)57 return database_results58 def __call__(self, text, database_results, belief=None, context=None):59 database_results = Lexicalizer.extend_database_results(database_results, belief)60 result_index = 061 last_assignment = defaultdict(set)62 def trans(label, span, force=False, loop=100):63 nonlocal result_index64 nonlocal last_assignment65 result_str = None66 for domain, (count, results) in database_results.items():67 if count == 0:68 continue69 result = results[result_index % len(results)]70 if label in result:71 result_str = result[label]72 if result_str == '?':73 result_str = 'unknown'74 if label == 'price range' and result_str == 'moderate' and \75 not text[span[1]:].startswith(' price range') and \76 not text[span[1]:].startswith(' in price'):77 result_str = 'moderately priced'78 if label == 'type':79 if text[:span[0]].endswith('no ') or text[:span[0]].endswith('any ') or \80 text[:span[0]].endswith('some ') or Lexicalizer.ends_with_number(text[:span[0]]):81 if not result_str.endswith('s'):82 result_str += 's'83 if label == 'time' and ('[leave at]' in text or '[arrive by]' in text) and \84 belief is not None and 'train' in belief and \85 any([k in belief['train'] for k in ('leave at', 'arrive by')]):86 # this is a specific case in which additional [time] slot needs to be lexicalised87 # directly from the belief state88 # "The earliest train after [time] leaves at ... and arrives by ..."89 if 'leave at' in belief['train']:90 result_str = belief['train']['leave at']91 else:92 result_str = belief['train']['arrive by']93 elif force:94 if label == 'time':95 if 'leave at' in result or 'arrive by' in result:96 if 'arrive' in text and 'arrive by' in result:97 result_str = result['arrive by'].lstrip('0')98 elif 'leave at' in result:99 result_str = result['leave at'].lstrip('0')100 elif context is not None and len(context) > 0:101 last_utt = context[-1]102 mtch = Lexicalizer.time_re.search(last_utt)103 if mtch is not None:104 result_str = mtch.group(1).lstrip('0')105 if result_str is not None:106 break107 if force and result_str is None:108 if label == 'reference':109 result_str = 'YF86GE4J'110 elif label == 'phone':111 result_str = '01223358966'112 elif label == 'postcode':113 result_str = 'CB11JG'114 elif label == 'agent':115 result_str = 'Cambridge Towninfo Centre'116 elif label == 'stars':117 result_str = '4'118 if result_str is not None and result_str.lower() in last_assignment[label] and loop > 0:119 result_index += 1120 return trans(label, force=force, loop=loop - 1, span=span)121 if result_str is not None:122 last_assignment[label].add(result_str.lower())123 return result_str or f'[{label}]'124 text = Lexicalizer.placeholder_re.sub(lambda m: trans(m.group(1), span=m.span()), text)125 text = Lexicalizer.placeholder_re.sub(lambda m: trans(m.group(1), force=True, span=m.span()), text)126 return text127 def save(self, path):128 shutil.copy(self.path, os.path.join(path, os.path.split(self.path)[-1]))129DB_ONTOLOGY = True130class Database:131 def __init__(self, zipf):132 self.path = zipf.filename133 module = types.ModuleType('convlab_dbquery')134 exec(zipf.read('convlab_dbquery.py').decode('utf-8'), module.__dict__)135 convlab_database = getattr(module, 'Database')136 self.ignore_values = DEFAULT_IGNORE_VALUES137 self.supported_domains = MW_DOMAINS138 self._name_map = None139 self._ontology = None140 self._regexp = None141 # Load database files142 def hacked_init(self):143 self.dbs = {}144 for domain in MW_DOMAINS:145 with zipf.open(f'db/{domain}_db.json') as f:146 self.dbs[domain] = json.load(f)147 setattr(convlab_database, '__init__', hacked_init)148 self.inner = getattr(module, 'Database')()149 # Load ontology150 if globals().get('DB_ONTOLOGY', True):151 with zipf.open('db_ontology.json') as f:152 self._ontology = {tuple(k.split('-')): set(v) for k, v in json.load(f).items()}153 self._build_replace_dict()154 price_re = re.compile(r'\d+\.\d+')155 @staticmethod156 def hack_query(belief):157 new_belief = OrderedDict()158 for domain, bs in belief.items():159 new_bs = OrderedDict()160 new_belief[domain] = new_bs161 for key, val in bs.items():162 val = bs[key]163 if domain == 'restaurant' and key == 'name' and val.lower() == 'charlie':164 val = 'charlie chan'165 if domain == 'restaurant' and key == 'name' and val.lower() == 'good luck':166 val = 'the good luck chinese food takeaway'167 # if domain == 'hotel' and key == 'name' and val.lower() == 'el shaddai guesthouse':168 # val = 'el shaddai'169 new_bs[key] = val170 return new_belief171 @staticmethod172 def capitalize(val):173 def _mk(v):174 i, v = v175 if i == 0 or v not in {'the', 'an', 'a', 'of', 'in', 'for', 'as', 'these', 'at', 'up', 'on', 'and', 'or'}:176 return v[:1].upper() + v[1:]177 else:178 return v179 return ' '.join(map(_mk, enumerate(val.split())))180 @staticmethod181 def map_database_key(key):182 if key == 'trainID':183 key = 'id'184 key = ''.join([' '+i.lower() if i.isupper()185 else i for i in key]).lstrip(' ')186 key = key.replace('_', ' ')187 if key == 'pricerange':188 key = 'price range'189 if key == 'taxi phone' or key == 'phone':190 key = 'phone'191 if key == 'taxi colors':192 key = 'color'193 if key == 'taxi types':194 key = 'brand'195 if key == 'ref':196 key = 'reference'197 if key == 'leaveAt':198 key = 'leave at'199 if key == 'arriveBy':200 key = 'arrive by'201 if key == 'entrance fee':202 key = 'fee'203 return key204 @staticmethod205 def map_database_row(domain, row, query):206 results = dict()207 for k, val in row.items():208 k2 = Database.map_database_key(k)209 if k == 'location':210 continue211 elif k == 'post code' or k == 'postcode':212 val = val.upper()213 elif k == 'name':214 val = Database.capitalize(val)215 elif k == 'type' and val == 'concerthall':216 val = 'concert hall'217 elif k == 'price' and domain == 'hotel' and isinstance(val, dict):218 val = val.get('single', val.get('double', next(iter(val.values()))))219 val = f'{val} pounds'220 if k2 == 'people':221 # BUG in MW2.0222 val = val.lstrip('`')223 results[k2] = val224 if 'color' in results and 'brand' in results:225 results['car'] = f"{results['color']} {results['brand']}"226 if domain == 'train' and 'price' in row and 'people' in query:227 people = int(query['people'])228 def multiply_people(m):229 price = float(m.group(0))230 price *= people231 return format(price, '.2f')232 if people != 1:233 results['price'] = Database.price_re.sub(multiply_people, row['price'])234 return results235 @staticmethod236 def normalize_for_db(s):237 s = ','.join(s.split(' ,'))238 s = s.replace('swimming pool', 'swimmingpool')239 s = s.replace('night club', 'nightclub')240 s = s.replace('concert hall', 'concerthall')241 return s242 @staticmethod243 def translate_to_db_col(s):244 if s == 'leave at':245 return 'leaveAt'246 elif s == 'arrive by':247 return 'arriveBy'248 elif s == 'price range':249 return 'pricerange'250 else:251 return s252 def domain_not_empty(self, domain_bs):253 return any(len(val) > 0 and val not in self.ignore_values for val in domain_bs.values())254 def _build_replace_dict(self):255 if self._regexp is not None:256 return257 clear_values = {'the', 'a', 'an', 'food'}258 clear_values.update(self._ontology[('hotel', 'type')])259 clear_values.update(self._ontology[('hotel', 'price range')])260 clear_values.update(self._ontology[('hotel', 'area')])261 clear_values.update(self._ontology[('restaurant', 'price range')])262 clear_values.update(self._ontology[('restaurant', 'food')])263 clear_values.update(self._ontology[('restaurant', 'area')])264 clear_values.update(self._ontology[('attraction', 'type')])265 clear_values = (f' {x} ' for x in clear_values)266 self._regexp = re.compile('|'.join(map(re.escape, clear_values)))267 db_entities = chain(self.inner.dbs['attraction'], self.inner.dbs['hotel'], self.inner.dbs['restaurant'])268 self._name_map = {self._clear_name(r): r['name'].lower() for r in db_entities}269 def _clear_name(self, domain_bs):270 name = ' ' + domain_bs['name'].lower() + ' '271 name = self._regexp.sub(' ', name)272 name = re.sub(r'\s+', ' ', name)273 name = name.strip()274 return name275 @staticmethod276 def _to_minutes(time):277 hour, minutes = tuple(map(int, time.split(':')))278 return minutes + 60 * hour279 def __call__(self, belief, return_results=False):280 belief = Database.hack_query(belief)281 all_results = OrderedDict()282 for domain, domain_bs in belief.items():283 if domain not in self.supported_domains:284 continue # skip unsupported domains285 if self.domain_not_empty(domain_bs) or \286 domain in [d.lower() for d in {'Police', 'Hospital'}]:287 def query_single(domain_bs):288 blocked_slots = {'people', 'booked', 'stay', 'ruk', 'topic'}289 if domain != 'train' and domain != 'bus':290 blocked_slots.add('day')291 query_bs = [(Database.translate_to_db_col(slot), Database.normalize_for_db(val))292 for slot, val in domain_bs.items() if slot not in blocked_slots]293 result = self.inner.query(domain, query_bs)294 result = [Database.map_database_row(domain, k, domain_bs) for k in result]295 # Implement sorting missing in convlab296 if domain == 'train' and 'arrive by' in domain_bs:297 result.sort(key=lambda x: self._to_minutes(x['arrive by']), reverse=True)298 elif domain == 'train' and 'leave at' in domain_bs:299 result.sort(key=lambda x: self._to_minutes(x['leave at']))300 return result301 result = query_single(domain_bs)302 if len(result) == 0 and 'name' in domain_bs and self._clear_name(domain_bs) in self._name_map:303 domain_bs = dict(**domain_bs)304 domain_bs['name'] = self._name_map[self._clear_name(domain_bs)]305 result = query_single(domain_bs)306 if return_results:307 all_results[domain] = (len(result), result)308 else:309 all_results[domain] = len(result)310 return all_results311 def save(self, path):312 shutil.copy(self.path, os.path.join(path, os.path.split(self.path)[-1]))313class BeliefStateTransformation:314 def __init__(self):315 self.ignore_values = DEFAULT_IGNORE_VALUES316 def _process_domain(self, domain_bs):317 return {self._map_slot(slot): self._clear_value(val) for slot, val in domain_bs.items()318 if (len(val) > 0 and val not in self.ignore_values)}319 def _map_slot(self, slot):320 if slot == 'arriveBy':321 return 'arrive by'322 if slot == 'leaveAt':323 return 'leave at'324 if slot == 'pricerange':325 slot = 'price range'326 return slot327 def _clear_value(self, value):328 value = value.replace('>', ' ')329 if value == 'el shaddia guesthouse':330 value = 'el shaddai'331 if value == 'concerthall':332 value = 'concert hall'333 if value == 'nightclub':334 value = 'night club'335 # BUG in MW2.0336 value = value.lstrip('`')337 return value338 @staticmethod339 def domain_not_empty(domain_bs, ignore_values):340 return any(len(val) > 0 and val not in ignore_values for val in domain_bs.values())341 def __call__(self, belief_state, dialogue_act, active_domain):342 clean_belief = dict()343 for domain, domain_bs in belief_state.items():344 new_domain_bs = {}345 if 'semi' in domain_bs:346 new_domain_bs.update(domain_bs['semi'])347 if 'book' in domain_bs:348 new_domain_bs.update({k: v for k, v in domain_bs['book'].items() if k != 'booked'})349 '''350 if 'ruk' in domain_bs: # extend belief state in TAHOE351 new_domain_bs.update({"ruk": domain_bs['ruk']})352 new_domain_bs.update({"topic": domain_bs['topic']})353 '''354 if not BeliefStateTransformation.domain_not_empty(domain_bs, self.ignore_values):355 continue356 new_domain_bs = self._process_domain(new_domain_bs)357 # TODO: uncomment for new iteration358 # if len(new_domain_bs) == 0: # TODO: remove this condition in next iteration359 # continue360 if 'internet' in new_domain_bs and new_domain_bs['internet'] == 'no':361 del new_domain_bs['internet'] # no internet by default362 if 'parking' in new_domain_bs and new_domain_bs['parking'] == 'no':363 del new_domain_bs['parking'] # no parking by default364 # TODO: comment for new iteration365 if len(new_domain_bs) > 0: # TODO: remove this condition in next iteration366 clean_belief[domain] = new_domain_bs367 for domain in {'Police', 'Hospital'}:368 if any([da[1] == domain for da in dialogue_act]):369 clean_belief[domain.lower()] = {}370 # Sort belief371 clean_belief = {k: OrderedDict(sorted(v.items(), key=lambda x: x[0])) for k, v in clean_belief.items()}372 active_bs = None373 if active_domain is not None:374 active_domain = active_domain.lower()375 active_bs = clean_belief.pop(active_domain, None)376 items = [(active_domain, active_bs)] if active_bs is not None else [] # put active domain at first377 items += [(k, v) for k, v in sorted(clean_belief.items(), key=lambda x: x[0])]378 result = OrderedDict(items)379 return result380def is_booked(raw_belief, domain):381 return domain in raw_belief and 'book' in raw_belief[domain] and \382 'booked' in raw_belief[domain]['book'] and \383 any('reference' in x for x in raw_belief[domain]['book']['booked'])384def get_booked_domains(raw_belief):385 for domain in raw_belief.keys():386 if is_booked(raw_belief, domain):387 yield domain388def parse_goal(dialog_goal):389 belief_transformation = BeliefStateTransformation()390 """Parses user goal into dictionary format."""391 goal = {}392 for domain in MW_DOMAINS:393 if not dialog_goal[domain]:394 continue395 goal[domain] = {}396 goal[domain] = {'informable': [], 'requestable': [], 'booking': {}}397 if 'info' in dialog_goal[domain]:398 # if d['goal'][domain].has_key('info'):399 if domain == 'train':400 # we consider dialogues only where train had to be booked!401 if 'book' in dialog_goal[domain]:402 # if d['goal'][domain].has_key('book'):403 goal[domain]['requestable'].append('reference')404 if 'reqt' in dialog_goal[domain]:405 # if d['goal'][domain].has_key('reqt'):406 if 'trainID' in dialog_goal[domain]['reqt']:407 goal[domain]['requestable'].append('id')408 else:409 if 'reqt' in dialog_goal[domain]:410 # if d['goal'][domain].has_key('reqt'):411 for s in dialog_goal[domain]['reqt']: # addtional requests:412 if s in ['phone', 'address', 'postcode', 'reference', 'id']:413 # ones that can be easily delexicalised414 goal[domain]['requestable'].append(s)415 if 'book' in dialog_goal[domain]:416 # if d['goal'][domain].has_key('book'):417 goal[domain]['requestable'].append("reference")418 goal[domain]["informable"] = dialog_goal[domain]['info']419 if 'book' in dialog_goal[domain]:420 # if d['goal'][domain].has_key('book'):421 goal[domain]["booking"] = dialog_goal[domain]['book']422 if 'invalid' in goal[domain]['booking']:423 del goal[domain]['booking']['invalid']424 if 'pre_invalid' in goal[domain]['booking']:425 del goal[domain]['booking']['pre_invalid']426 belief = {domain: {'semi': goal[domain]['informable'], 'book': goal[domain]['booking']}}427 belief = belief_transformation(belief, [], domain).get(domain, dict())428 goal[domain]['informable'] = belief429 del goal[domain]['booking']430 return goal431def clear_whitespaces(text):432 text = re.sub(r'[\s\n\r]+', ' ', text)433 text = ' ' + text + ' '434 text = re.sub(r'\s([,\.:\?\!\']+)', lambda m: m.group(1), text)435 return text.strip()436def build_fix_belief_from_database(database_engine):437 _clear_dict = None438 def clear_dict():439 nonlocal _clear_dict440 if _clear_dict is None:441 # Build clear dict442 _clear_dict = dict()443 db_values = set()444 for x in database_engine.inner.dbs['attraction']:445 db_values.add(x['name'])446 _clear_dict = OrderedDict((x.replace("'", ''), x) for x in db_values)447 return _clear_dict448 def call(belief):449 # fix belief state, put back apostrophs450 for domain, bs in belief.items():451 for key, value in bs.items():452 bs[key] = clear_dict().get(value, value)453 return belief454 return call455def normalize(text):456 text = text.replace('swimmingpool', 'swimming pool')457 text = text.replace('nigthclub', 'night club')458 text = text.replace('Shanghi', 'Shanghai')459 return text460DELEX_LABEL_MAP = {461 'Price': 'price range',462 'Fee': None, # 'fee',463 'Addr': 'address',464 'Area': 'area',465 'Stars': 'stars',466 'Department': None, # 'department',467 'Stay': None, # 'stay',468 'Ref': 'reference',469 'Food': 'food',470 'Type': 'type',471 'Choice': None, # ignore472 'Phone': 'phone',473 'Ticket': 'price',474 'Day': None, # 'day',475 'Name': 'name',476 'Car': 'car',477 'Leave': 'leave at',478 'Time': 'time',479 'Arrive': 'arrive by',480 'Post': 'postcode',481 'Depart': None, # 'departure',482 'People': None, # 'people',483 'Dest': None, # 'destination',484 'Open': None, # ignore485 'Id': 'id',486}487def delexicalise_spans(response, spans, allowed_slots=None):488 allowed_slots = set(allowed_slots or [])489 # First, we clear the spans490 new_spans = []491 for i, span in enumerate(spans):492 if span[1] == 'Fee' and 'vary' in span[-3]:493 pass494 elif DELEX_LABEL_MAP[span[1]] not in allowed_slots:495 pass496 else:497 new_spans.append(span)498 spans = new_spans499 delex = []500 assignment = []501 textlen = 0502 for i, original in enumerate(response.split()):503 for span in spans:504 label = DELEX_LABEL_MAP[span[1]]505 textlen += 1 + len(original)506 if label is None:507 continue # causes this token to be added508 if label == 'time' and ('minute' in span[-3] or 'hour' in span[-3] or 'day' in span[-3]):509 label = 'duration'510 if original in {',', '.', ':'}:511 if i == span[3]:512 delex.append(original)513 delex.append(f'[{label}]')514 assignment.append((label, None, textlen))515 else:516 continue517 if i == span[3]:518 if label == 'stars' and '-star' in original:519 number, ext = original.split('-')520 delex.append(f'[{label}]-{ext}')521 original = number522 assignment.append((label, original, textlen - len(original)))523 elif label == 'area' and original == 'the':524 delex.append('the')525 delex.append(f'[{label}]')526 original = None527 assignment.append((label, original, textlen))528 elif label == 'area' and original == 'in' and span[-3].startswith('in the '):529 delex.extend(['in', 'the'])530 delex.append(f'[{label}]')531 original = None532 assignment.append((label, original, textlen))533 elif label == 'time' and original == 'a':534 delex.append('a')535 delex.append(f'[{label}]')536 original = None537 assignment.append((label, original, textlen))538 elif label == 'stay' and 'day' in original:539 delex.append(f'[{label}]')540 delex.append('days' if 'days' in original else 'day')541 assignment.append((label, original, textlen - len(original)))542 elif label == 'address' and len(delex) >= 2 and delex[-1] == ',' and delex[-2] == '[address]':543 delex.pop()544 label, text, index = assignment[-1]545 assignment[-1] = (label, f'{text} , {original}', index)546 else:547 delex.append(f'[{label}]')548 assignment.append((label, original, textlen - len(original)))549 break550 elif span[3] < i <= span[4]:551 # already added the label552 label, text, index = assignment[-1]553 if text is None:554 text = original555 else:556 text = f'{text} {original}'557 if i == span[4] and label == 'area' and text.endswith(' of town'):558 delex.extend(['of', 'town'])559 text = text[:-len(' of town')]560 if i == span[4] and label == 'time' and text.endswith(' ride'):561 delex.append('ride')562 text = text[:-len(' ride')]563 assignment[-1] = (label, text, index)564 break565 else:566 delex.append(original)567 return ' '.join(delex), assignment568def delexicalise(utt, return_replacements=False, database_results=None, belief=None, spans=None):569 database_results = Lexicalizer.extend_database_results(database_results, belief)570 # Delexicalise only the stuff that we can put back571 allowed_keys = {'reference'} # always delex reference572 for domain, (count, results) in database_results.items():573 if count > 0:574 allowed_keys.update(results[0].keys())575 if 'arrive by' in allowed_keys or 'leave at' in allowed_keys:576 allowed_keys.add('time')577 # First we use the span_info annotations578 spans = sorted(spans, key=lambda x: x[-2])579 utt, replacements = delexicalise_spans(utt, spans, allowed_keys)580 if return_replacements:581 replacements = [x[:2] for x in replacements]582 # replacements.sort(key=lambda k: k[0])583 return utt, replacements584 return utt585def export_data(path, zipf):586 global DB_ONTOLOGY587 def da2tuples(dialog_act):588 tuples = []589 for domain_intent, svs in dialog_act.items():590 for slot, value in sorted(svs, key=lambda x: x[0]):591 domain, intent = domain_intent.split('-')592 tuples.append([intent, domain, slot, value])593 return tuples594 transform_belief = BeliefStateTransformation()595 DB_ONTOLOGY = False596 with zipfile.ZipFile(os.path.join(path, 'database.zip')) as dbzipf:597 db = Database(dbzipf)598 DB_ONTOLOGY = True599 fix_belief_from_database = build_fix_belief_from_database(db)600 ontology = defaultdict(lambda: set())601 splits = []602 for split in ['train', 'val', 'test']:603 ignored_dialogues = 0604 dialogues = []605 splits.append((split, dialogues))606 with zipfile.ZipFile(BytesIO(zipf.read(f'{split}.json.zip'))) as zsplitf:607 data = json.load(zsplitf.open(f'{split}.json'))608 logger.info('loaded {}, size {}'.format(split, len(data)))609 for sess_id, sess in data.items():610 goal = parse_goal(sess['goal'])611 dialogue = dict(name=sess_id, items=[], goal=goal)612 active_domain = None613 ignore_dialogue = False614 for i, turn in enumerate(sess['log']):615 text = turn['text']616 da = da2tuples(turn['dialog_act'])617 item = dict(618 speaker='user' if i % 2 == 0 else 'system',619 text=text,620 dialogue_act=da621 )622 if item['speaker'] == 'system':623 belief = turn['metadata']624 item['span_info'] = turn['span_info']625 # Detect active domain and Judge turn type626 ruk = False627 for domain, meta in belief.items():628 if "ruk" in meta.keys():629 item['active_domain'] = domain630 ruk = True631 item['document'] = turn['knowledge']['snippet']['body']632 belief[domain]['topic'] = " ".join(turn['knowledge']['keywords'])633 break634 item['uk_based'] = ruk635 if not ruk:636 item['document'] = ""637 domain_counter = Counter({x[1].lower() for x in da}.intersection(MW_DOMAINS))638 if domain_counter:639 active_domain = domain_counter.most_common(1)[0][0]640 item['active_domain'] = active_domain641 '''In TAHOE, this needs modification to get extended belief'''642 belief = transform_belief(belief, da, active_domain)643 belief = fix_belief_from_database(belief) # normalize value (entity name)644 item['belief'] = belief645 if 'bus' in belief:646 # We need to ignore this dialogue647 # There is no data for the bus domain648 ignore_dialogue = True649 break650 for k, bs in belief.items():651 for k2, val in bs.items():652 ontology[(k, k2)].add(val)653 # Add booked property654 item['booked_domains'] = sorted(get_booked_domains(turn['metadata']))655 dialogue['items'].append(item)656 if not ignore_dialogue:657 dialogues.append(dialogue)658 else:659 ignored_dialogues += 1660 if ignored_dialogues > 0:661 logger.warning(f'dialogues were ignored {ignored_dialogues * 100 / (ignored_dialogues + len(dialogues)):.2f}% due to a missing domain "bus"') # noqa: E501662 with zipfile.ZipFile(os.path.join(path, 'database.zip'), 'a') as dbzipf:663 # with dbzipf.open('db_ontology.json', 'w') as f:664 # f.write(json.dumps({'-'.join(k): list(v) for k, v in ontology.items()}).encode('utf-8'))665 db = Database(dbzipf)666 # Delexicalize loaded data667 for split, dialogues in splits:668 for dialogue in tqdm(dialogues, desc=f'delexicalising {split}'):669 for item in dialogue['items']:670 text = item['text']671 if item['speaker'] == 'system':672 belief = item['belief']673 span_info = item['span_info']674 del item['span_info']675 database_results = db(belief, return_results=True)676 '''677 if item['uk_based']: # do not delex newly inserted turns678 delexicalised_text = normalize(text)679 '''680 delexicalised_text = delexicalise(text, return_replacements=False,681 database_results=database_results,682 belief=deepcopy(belief), spans=span_info)683 item['delexicalised_text'] = clear_whitespaces(delexicalised_text)684 database_results = OrderedDict((domain, count) for domain, (count, results)685 in database_results.items())686 item['database'] = database_results687 text = normalize(text)688 item['text'] = clear_whitespaces(text)689 with open(os.path.join(path, f'{split}.json'), 'w+') as f:690 json.dump(dict(dialogues=dialogues, domains=MW_DOMAINS), f)691def preprocess():692 path = os.path.join(DATASETS_PATH, 'multiwoz-2.1')693 os.makedirs(path, exist_ok=True)694 # Download the dataset695 # commit_sha = 'e368deeb3d405caf19236fb768360a6517a24fcd'696 with zipfile.ZipFile(os.path.join(path, 'data_aug.zip')) as zipf:697 export_data(path, zipf)698 # Generating blacklist699 logger.info('Generating blacklist')700 cwd = os.path.dirname(os.path.abspath(__file__))701 subprocess.run(['python', os.path.join(cwd, 'build_multiwoz_blacklist.py'), '--dataset', 'multiwoz-2.1'], cwd=cwd)702if __name__ == "__main__":...
object_detection.py
Source:object_detection.py
...94 return resp195 '''96 Clear the dictionary that keep tracks of the detected objects97 '''98 def _clear_dict(self):99 self.dict_obj[HeadMovement.CENTRO].clear()100 self.dict_obj[HeadMovement.SINISTRA].clear()101 self.dict_obj[HeadMovement.DESTRA].clear()102 '''103 Callback called after receiving an update on the topic in which I'm subscribed104 105 @param: data Message containing head position as integer and data stream of image.106 '''107 def callback(self, data: ImagePos):108 #Retrieve postion from the image109 pos = data.pos110 # convert Image into numpy array111 img = ros_numpy.numpify(data.image)112 # image preprocessing113 img = img[:, :, ::-1]114 input_tensor = tf.convert_to_tensor(img)115 input_tensor = input_tensor[tf.newaxis, ...]116 # detect classes into the image117 start = time.time()118 detections = detect_fn(input_tensor)119 end = time.time()120 elapse = str(round(end - start, 2))121 print("Detection completed in", elapse, "seconds")122 num_above_thresh = np.sum(detections['detection_scores'] > 0.5)123 detections.pop('num_detections')124 detections = {key: value[0, :num_above_thresh].numpy() for key, value in detections.items()}125 detections['detection_classes'] = detections['detection_classes'].astype(np.int64)126 #For every detected object update or initialize the counting of that object in that direction127 for c, s in zip(detections['detection_classes'], detections['detection_scores']):128 if self.dict_obj[pos].get(classmap[c]) is None:129 self.dict_obj[pos][classmap[c]] = 1130 else:131 self.dict_obj[pos][classmap[c]] = self.dict_obj[pos][classmap[c]] + 1132 print(pos, self.dict_obj[pos])133 #Update the counter variable "count"134 self.sum_count()135 '''136 Callback is called when the dictionary is ready to be sent.137 138 @param: req Client request 139 '''140 def handleService(self, req):141 #First case, Pepper didn't do all 3 movements, so wait to be notified after every detection142 if(self.count != 3):143 scheduler.acquire()144 scheduler.wait()145 scheduler.release()146 147 self.count = 0148 try:149 #Call the method talk to make Pepper perform the speech150 resp = self.talk()151 #Clear the dictionary with found and detected objects152 self._clear_dict()153 #Return the result of 'capture_ended' service154 return capture_endedResponse(resp.result)155 except rospy.ServiceException as e:156 rospy.logwarn("Service call failed: %s" % e)157 self._clear_dict()158 return capture_endedResponse(False)159def init_model():160 img = np.full((512, 512, 3), 0, dtype=np.uint8)161 img = img[:, :, ::-1]162 input_tensor = tf.convert_to_tensor(img)163 input_tensor = input_tensor[tf.newaxis, ...]164 detect_fn(input_tensor)165if __name__ == '__main__':166 #Wait the service animatedSay167 rospy.wait_for_service('animatedSay')168 #Call the service Say169 call = rospy.ServiceProxy('animatedSay', Say)170 #call("One moment please. I'm loading the model into my brain")171 print('Loading model...', end='')...
diction.py
Source:diction.py
...10 capital = False11 12 def __init__(self, letter_dict = None):13 if letter_dict is None:14 self._clear_dict()15 else:16 self.diction = letter_dict17 self._clear_buff()18 19 def _clear_buff(self):20 self.syllables = {}21 self.buffer = ''22 self.index = 023 24 def _clear_dict(self):25 self.diction = {**self._vowelGrab(cm.cDict), **cm.cDictVow}26 if self.capital:27 self._set_capital()28 29 @staticmethod30 def _UpperDict(cdict):31 return {k.capitalize():v for (k,v) in cdict.items()}32 33 @staticmethod34 def _doubleDict(cdict):35 return {k*2:v for (k,v) in cdict.items()}36 37 @staticmethod38 def _extDict(cdict):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!