Best Python code snippet using gherkin-python
citation_extraction.py
Source:citation_extraction.py
1#!/usr/bin/env python32# -*- coding: utf-8 -*-3import pandas as pd4import numpy as np5from lxml import etree6import regex7import nltk8import string9import csv10import re11from nltk.sentiment.vader import SentimentIntensityAnalyzer12import multiprocessing as mp13import tei_tools14data_dir = 'data/raw/'15ns = {'tei': '{http://www.tei-c.org/ns/1.0}', 'w3': '{http://www.w3.org/XML/1998/namespace}'}16sid = SentimentIntensityAnalyzer()17# Keywords adapted versionof Tams, S., & Grover, V. (2010). The Effect of an IS Article's Structure on Its Impact. CAIS, 27, 10.18introduction_keywords = ['introduction']19background_keywords = ['background',20 'literature review',21 'review of',22 'critical review']23theory_frontend_keywords = ['conceptual development',24 'hypothesis development',25 'research hypotheses',26 'research model',27 'research questions',28 'theory',29 'theoretical background',30 'theoretical development',31 'theoretical model',32 'theoretical']33method_keywords = ['data collection',34 'methodology',35 'methods',36 'model testing',37 'procedure',38 'research methodology']39implications_keywords = ['contribution',40 'discussion',41 'future research',42 'implications',43 'implications for future research',44 'implications for practice',45 'limitations',46 'practical implications',47 'recommendations',48 'theoretical implications']49# Extension of Tams and Grover (2010):50theory_frontend_keywords.extend(['theoretical foundation',51 'conceptual foundation',52 'conceptual basis',53 'model and hypotheses',54 'prior research',55 'related research',56 'theoretical framing',57 'theoretical framework',58 'framework',59 'hypotheses',60 'conceptualizing',61 'defining',62 'hypotheses development',63 'related literature',64 'model development'])65method_keywords.extend(['method',66 'research design',67 'research framework',68 'research method',69 'robustness',70 'hypothesis testing',71 'literature survey',72 'scale validation',73 'measur',74 'control variable',75 'coding'])76results_keywords = ['analysis',77 'findings',78 'results',79 'robustness']80implications_keywords.extend(['conclusion',81 'further research',82 'concluding remarks',83 'research agenda'])84appendix_keywords = ['appendi',85 'electronic companion']86def parse_author(author_string):87 authors = author_string.split(' and ')88 last_names = []89 for author in authors:90 last_names.append(author[:author.index(',')])91 return last_names92def build_citation_regex(authors, year):93 if len(authors) == 1:94 return(authors[0] + "'?s?,? (\(?" + str(year) + '\)?)?')95 elif len(authors) == 2:96 return(authors[0] + ' (&|and|&) ' + authors[1] + "'?s?,? (\(?" + str(year) + '\)?)?')97 else:98 return(authors[0] + ' et al.?,? (\(?' + str(year) + '\)?)?')99def get_position_in_sentence(sentence):100 return sentence.index('REFERENCE')/(len(sentence)-len('REFERENCE'))101def get_sentiment(document):102 return sid.polarity_scores(document)103def is_textual_citation(sentence):104 return regex.search('\([^(\))|^(\()]*?REFERENCE[^(\()|^(\))]*?\)', sentence, regex.DOTALL) is None105def is_separate(sentence):106 before = regex.search('CITATION ?REFERENCE', sentence, regex.DOTALL)107 after = regex.search('REFERENCE ?CITATION', sentence, regex.DOTALL)108 return before is None and after is None109def get_popularity(sentence, marker='CITATION'):110 return sentence.count(marker)111def get_density(sentence):112 return get_popularity(sentence, marker='REFERENCE') / (get_popularity(sentence, marker='CITATION') + get_popularity(sentence, marker='REFERENCE'))113def get_pos_structure(sentence):114 tokenized = nltk.word_tokenize(sentence)115 pos_tags = nltk.pos_tag(tokenized)116 pos = []117 for pos_tag in pos_tags:118 if pos_tag[0] == 'REFERENCE':119 pos.append(pos_tag[0])120 else:121 pos.append(pos_tag[1])122 return ' '.join([tag for tag in pos if tag not in string.punctuation])123def find_pos_patterns(pos_sentence):124 pattern_0 = regex.compile('^.*REFERENCE VB[DPZN].*$').match(pos_sentence) is not None125 pattern_1 = regex.compile('^.*VB[DPZ] VB[GN].*$').match(pos_sentence) is not None126 pattern_2 = regex.compile('^.*VB[DGPZN]? (RB[RS]? )*VBN.*$').match(pos_sentence) is not None127 pattern_3 = regex.compile('^.*MD (RB[RS]? )*VB (RB[RS]? )*VBN.*$').match(pos_sentence) is not None128 pattern_4 = regex.compile('^(RB[RS]? )*PRP (RB[RS]? )*V.*$').match(pos_sentence) is not None129 pattern_5 = regex.compile('^.*VBG (NNP )*(CC )*(NNP ).*$').match(pos_sentence) is not None130 return [pattern_0, pattern_1, pattern_2, pattern_3, pattern_4, pattern_5]131def has_comp_sup(pos_sentence):132 return regex.compile('RB[RS]').match(pos_sentence) is not None133def has_1st_3rd_prp(citation_sentence):134 tokenized = nltk.word_tokenize(citation_sentence)135 pos_tags = nltk.pos_tag(tokenized)136 for pos_tag in pos_tags:137 if pos_tag[1] == 'PRP':138 if pos_tag[0] in ['I', 'i', 'We', 'we']:139 return True140 return False141def get_position_in_document(whole_document_text, predecessor, sentence, successor):142 predecessor_position = whole_document_text.find(extract_sentence_part_without_REF_or_CIT(predecessor))143 sentence_position = whole_document_text.find(extract_sentence_part_without_REF_or_CIT(sentence))144 successor_position = whole_document_text.find(extract_sentence_part_without_REF_or_CIT(successor))145 positions = [x for x in [predecessor_position, sentence_position, successor_position] if x > 1]146 if len(positions) > 0:147 return round(np.mean(positions)/len(whole_document_text), 3)148 else:149 return ''150def get_full_headings(root):151 full_headings = []152 for head in root.iter(ns['tei'] + 'head'):153 if head.getparent() is not None:154 if head.getparent().tag == ns['tei'] + 'figure':155 continue156 if head.text is not None:157 full_headings.append(str.title(head.text).lower())158 return full_headings159def get_heading(p):160 heading = 'NA'161 div = p.getparent()162 try:163 heading = div.find(ns['tei'] + 'head').text164 return heading165 except:166 pass167 # sometimes, there might be no heading in the same div tag -> check previous div.168 try:169 heading = div.xpath("preceding::div")[-1].find(ns['tei'] + 'head').text170 return heading171 except:172 pass173 return heading174def ref_in_tableDesc(el, heading_title):175 if 'figDesc' in el.getparent().tag and 'table' in heading_title.lower():176 return True177 if 'head' in el.getparent().tag and 'table' in el.getparent().text.lower():178 return True179 else:180 return False181def ref_in_figDesc(el, heading_title):182 if 'figDesc' in el.getparent().tag and 'figure' in heading_title.lower():183 return True184 if 'head' in el.getparent().tag and 'figure' in el.getparent().text.lower():185 return True186 else:187 return False188def ref_in_heading(el, heading_title):189 if 'head' in el.getparent().tag:190 return True191 else:192 return False193def match_headings(full_headings):194 matched_headings = ['-'] * len(full_headings)195 for i in range(0,len(full_headings)-1):196 if any(x in full_headings[i] for x in introduction_keywords):197 matched_headings[i] = 'introduction'198 if any(x in full_headings[i] for x in background_keywords):199 matched_headings[i] = 'background'200 if any(x in full_headings[i] for x in theory_frontend_keywords):201 matched_headings[i] = 'theory_frontend'202 if any(x in full_headings[i] for x in method_keywords):203 matched_headings[i] = 'method'204 if any(x in full_headings[i] for x in results_keywords):205 matched_headings[i] = 'results'206 if any(x in full_headings[i] for x in implications_keywords):207 matched_headings[i] = 'implications'208 if any(x in full_headings[i] for x in appendix_keywords):209 matched_headings[i] = 'appendix'210 # fill gap between same-category headings211 last_category = '-'212 for i in range(0,len(matched_headings)-1):213 if matched_headings[i] == '-':214 continue215 # now, we have cases in which matched_headings[i] != '-'216 # replace last_category if it differs from current heading217 if last_category != matched_headings[i]:218 last_category = matched_headings[i]219 # fill previous missing categories ('-') if previous category is the same220 else:221 n = i222 while True:223 matched_headings[n] = last_category224 n -= 1225 if matched_headings[n] != '-':226 break227 # continue with same category if next category corresponds to IMRAD (intro background theory methods results discussion)228 last_category = '-'229 for i in range(0,len(matched_headings)-1):230 if matched_headings[i] == '-':231 continue232 # now, we have cases in which matched_headings[i] != '-'233 if last_category == 'introduction' and matched_headings[i] in ['background', 'theory_frontend']:234 n = i-1235 while n>=0:236 matched_headings[n] = last_category237 n -= 1238 if matched_headings[n] != '-':239 break240 if last_category == 'background' and matched_headings[i] in ['theory_frontend']:241 n = i-1242 while n>=0:243 matched_headings[n] = last_category244 n -= 1245 if matched_headings[n] != '-':246 break247 if last_category == 'theory_frontend' and matched_headings[i] in ['method']:248 n = i-1249 while n>=0:250 matched_headings[n] = last_category251 n -= 1252 if matched_headings[n] != '-':253 break254 if last_category == 'method' and matched_headings[i] in ['results']:255 n = i-1256 while n>=0:257 matched_headings[n] = last_category258 n -= 1259 if matched_headings[n] != '-':260 break261 if last_category == 'results' and matched_headings[i] in ['implications']:262 n = i-1263 while n>=0:264 matched_headings[n] = last_category265 n -= 1266 if matched_headings[n] != '-':267 break268 last_category = matched_headings[i]269 # if last heading is an appendix: the following ones are also appendices270 n = len(matched_headings)271 while n >=1:272 n -= 1273 if matched_headings[n] == '-':274 continue275 if matched_headings[n] == 'appendix':276 if n != len(matched_headings)-1:277 while n < len(matched_headings):278 matched_headings[n] = 'appendix'279 n += 1280 break281 else:282 break283 return matched_headings284def get_heading_category(heading_title, position_in_document, full_headings, matched_headings):285 heading_catetory = 'NA'286 if heading_title is None:287 return heading_catetory288 for i in range(0,len(full_headings)):289 if heading_title.lower() == full_headings[i]:290 heading_catetory = matched_headings[i]291 if str(position_in_document).replace('.','').isdigit():292 if heading_title == 'NA' and position_in_document < 0.3:293 heading_catetory = 'introduction'294 return heading_catetory295def parse_numeric_citation(row, CURRENT_LR, root):296 df = pd.DataFrame(columns=columnnames)297 whole_document_text = str(etree.tostring(root.find('.//' + ns['tei'] + 'body'), pretty_print=True).decode('utf-8'))298 full_headings = get_full_headings(root)299 matched_headings = match_headings(full_headings)300 BIBLIOGRAPHY = pd.DataFrame(columns = ['reference_id', 'author', 'title', 'year', 'journal', 'similarity'])301 for reference in root.find('.//' + ns['tei'] + 'listBibl'):302 reference_id = tei_tools.get_reference_bibliography_id(reference)303 title_string = tei_tools.get_reference_title_string(reference)304 author_string = tei_tools.get_reference_author_string(reference)305 year_string = tei_tools.get_reference_year_string(reference)306 journal_string = tei_tools.get_reference_journal_string(reference)307 if title_string is None:308 continue309 ENTRY = pd.DataFrame.from_records([[reference_id, author_string, title_string, year_string, journal_string, 0]],310 columns = ['reference_id', 'author', 'title', 'year', 'journal', 'similarity'])311 ENTRY.loc[0, 'similarity'] = tei_tools.get_similarity(ENTRY, CURRENT_LR)312 BIBLIOGRAPHY = BIBLIOGRAPHY.append(ENTRY)313 BIBLIOGRAPHY = BIBLIOGRAPHY.reset_index(drop=True)314 LR_ENTRY = BIBLIOGRAPHY.loc[BIBLIOGRAPHY['similarity'].idxmax()]315 if LR_ENTRY['similarity'] > 0.85:316 ref_id = LR_ENTRY['reference_id']317 for ref in root.iter(ns['tei'] + 'ref'):318 if ref.get('target') == '#' + ref_id:319 p = ref.getparent()320 temp_p = etree.fromstring(etree.tostring(p))321 for elem in temp_p.iter(ns['tei'] + 'ref'):322 if elem.get('target') != '#' + ref_id:323 temp_p.text += 'CITATION'324 if elem.tail:325 temp_p.text += elem.tail326 temp_p.remove(elem)327 else:328 temp_p.text += 'REFERENCE'329 if elem.tail:330 temp_p.text += elem.tail331 temp_p.remove(elem)332 replacements = {'c.f.':'cf', 'e.g.':'eg', 'pp.':'', 'etc.':'etc', 'cf.':'cf', '\n':'', '\r':''}333 for i, j in replacements.items():334 temp_p.text = temp_p.text.replace(i, j)335 sentences = nltk.sent_tokenize(temp_p.text)336 for index, sentence in enumerate(sentences):337 if 'REFERENCE' in sentence:338 if index-1 < 0:339 predecessor = ''340 else:341 predecessor = sentences[index-1]342 if index+1 >= len(sentences):343 successor = ''344 else:345 successor = sentences[index+1]346 sentence = sentence.strip()347 predecessor = predecessor.strip()348 successor = successor.strip()349 context = ' '.join([predecessor, sentence, successor])350 sentence_sent = get_sentiment(sentence)351 context_sent = get_sentiment(context)352 pos_structure = get_pos_structure(sentence)353 pos_patterns = find_pos_patterns(pos_structure)354 position_in_document = get_position_in_document(whole_document_text, predecessor, sentence, successor)355 heading_title = get_heading(p)356 df.loc[len(df)] = [row['citation_key_lr'],357 row['citation_key_cp'],358 sentence,359 predecessor,360 successor,361 False, # alphanumeric citations cannot be textual362 is_separate(sentence),363 get_popularity(sentence),364 get_popularity(context),365 get_density(sentence),366 get_density(context),367 get_position_in_sentence(sentence),368 sentence_sent['neg'],369 sentence_sent['neu'],370 sentence_sent['pos'],371 sentence_sent['compound'],372 context_sent['neg'],373 context_sent['neu'],374 context_sent['pos'],375 context_sent['compound'],376 has_comp_sup(pos_structure),377 has_1st_3rd_prp(sentence),378 pos_structure,379 pos_patterns[0],380 pos_patterns[1],381 pos_patterns[2],382 pos_patterns[3],383 pos_patterns[4],384 pos_patterns[5],385 position_in_document,386 heading_title,387 get_heading_category(heading_title, position_in_document, full_headings, matched_headings),388 ref_in_figDesc(ref, heading_title),389 ref_in_tableDesc(ref, heading_title),390 ref_in_heading(ref, heading_title)]391 return(df)392def extract_sentence_part_without_REF_or_CIT(sentence):393 #always choose the shorter part since the longer includes the other type of marker394 left_reference_part = sentence[:sentence.find('REFERENCE')]395 left_citation_part = sentence[:sentence.find('CITATION')]396 if len(left_reference_part) > len(left_citation_part):397 left_part = left_citation_part398 else:399 left_part = left_reference_part400 right_reference_part = sentence[sentence.rfind('REFERENCE'):]401 right_citation_part = sentence[sentence.rfind('CITATION'):]402 if len(right_reference_part) > len(right_citation_part):403 right_part = right_citation_part404 else:405 right_part = right_reference_part406 #return the longer part since no markers included in left_part or right_part407 if len(left_part) > len(right_part):408 return left_part409 else:410 return right_part411def parse_standard_citation(row, CURRENT_LR, root):412 df = pd.DataFrame(columns=columnnames)413 citation_regex = build_citation_regex(parse_author(row['author_lr']), row['year_lr'])414 whole_document_text = str(etree.tostring(root.find('.//' + ns['tei'] + 'body'), pretty_print=True).decode('utf-8'))415 full_headings = get_full_headings(root)416 matched_headings = match_headings(full_headings)417 ref_id_lr = tei_tools.get_reference_id(root, CURRENT_LR)418 for ref in root.iter(ns['tei'] + 'ref'):419 if ref.text is not None:420 search_citation_regex = regex.search(citation_regex, ref.text, regex.DOTALL)421 search_grobid_id = False422 if ref_id_lr is not None and ref.get('target') is not None:423 search_grobid_id = ref.get('target').replace('#', '') == ref_id_lr424 if search_citation_regex or search_grobid_id:425 p = ref.getparent()426 if p.tag == ns['tei'] + 'div':427 p.remove(ref)428 p.find(ns['tei'] + 'p').insert(0, ref)429 temp_p = etree.fromstring(etree.tostring(p.find(ns['tei'] + 'p').decode('utf-8')))430 else:431 temp_p = etree.fromstring(etree.tostring(p))432 if temp_p.text is None:433 continue434 for elem in temp_p.iter(ns['tei'] + 'ref'):435 ref_search_citation_regex = regex.search(citation_regex, elem.text, regex.DOTALL)436 ref_search_grobid_id = False437 if ref_search_grobid_id is not None and elem.get('target') is not None:438 ref_search_grobid_id = elem.get('target').replace('#', '') == ref_id_lr439 if ref_search_citation_regex or ref_search_grobid_id:440 temp_p.text += 'REFERENCE'441 if elem.tail:442 temp_p.text += elem.tail443 temp_p.remove(elem)444 else:445 temp_p.text += 'CITATION'446 if elem.tail:447 temp_p.text += elem.tail448 temp_p.remove(elem)449 replacements = {'c.f.':'cf', 'e.g.':'eg', 'pp.':'', 'etc.':'etc', 'cf.':'cf', '\n':'', '\r':''}450 for i, j in replacements.items():451 temp_p.text = temp_p.text.replace(i, j)452 sentences = nltk.sent_tokenize(temp_p.text)453 for index, sentence in enumerate(sentences):454 if 'REFERENCE' in sentence:455 if index-1 < 0:456 predecessor = ''457 else:458 predecessor = sentences[index-1]459 if index+1 >= len(sentences):460 successor = ''461 else:462 successor = sentences[index+1]463 sentence = sentence.strip()464 predecessor = predecessor.strip()465 successor = successor.strip()466 context = ' '.join([predecessor, sentence, successor])467 sentence_sent = get_sentiment(sentence)468 context_sent = get_sentiment(context)469 pos_structure = get_pos_structure(sentence)470 pos_patterns = find_pos_patterns(pos_structure)471 position_in_document = get_position_in_document(whole_document_text, predecessor, sentence, successor)472 heading_title = get_heading(p)473 df.loc[len(df)] = [row['citation_key_lr'],474 row['citation_key_cp'],475 sentence,476 predecessor,477 successor,478 is_textual_citation(sentence),479 is_separate(sentence),480 get_popularity(sentence),481 get_popularity(context),482 get_density(sentence),483 get_density(context),484 get_position_in_sentence(sentence),485 sentence_sent['neg'],486 sentence_sent['neu'],487 sentence_sent['pos'],488 sentence_sent['compound'],489 context_sent['neg'],490 context_sent['neu'],491 context_sent['pos'],492 context_sent['compound'],493 has_comp_sup(pos_structure),494 has_1st_3rd_prp(sentence),495 pos_structure,496 pos_patterns[0],497 pos_patterns[1],498 pos_patterns[2],499 pos_patterns[3],500 pos_patterns[4],501 pos_patterns[5],502 position_in_document,503 heading_title,504 get_heading_category(heading_title, position_in_document, full_headings, matched_headings),505 ref_in_figDesc(ref, heading_title),506 ref_in_tableDesc(ref, heading_title),507 ref_in_heading(ref, heading_title)]508 return(df)509def parse_citation(row):510 CURRENT_LR = ARTICLE[ARTICLE.citation_key == row['citation_key_lr']].head(1)511 CURRENT_LR = CURRENT_LR[['citation_key', 'author', 'title', 'year', 'journal']]512 CURRENT_LR.rename(index=str, columns={"citation_key": "reference_id"}, inplace=True)513 CURRENT_LR['similarity'] = 0514 # before parsing in-text citations: add ref-tags for LRs that have not been annotated by grobid515 file = open(data_dir + 'xml/' + row['citation_key_cp'] + '.tei.xml', "r")516 xml_string = file.read()517 root = etree.fromstring(xml_string)518 reference_id = tei_tools.get_reference_id(root, CURRENT_LR)519 author_list = parse_author(CURRENT_LR.iloc[0]['author'])520 if len(author_list) > 1:521 in_text_citation = build_citation_regex(parse_author(CURRENT_LR.iloc[0]['author']), CURRENT_LR.iloc[0]['year'])522 pattern = re.compile('(?!<ref[^>]*?>)(' + in_text_citation + ')(?![^<]*?</ref>)', re.IGNORECASE)523 main_part = xml_string.split('<listBibl>', 1)[0]524 reference_part = xml_string.split('<listBibl>', 1)[1]525 xml_string = pattern.sub('<ref target="#' + reference_id + '">\\1</ref>', main_part) + '<listBibl>' + reference_part526 # annotate cases like "D&M model527 if len(author_list) == 2:528 in_text_citation = author_list[0][0] + '&' + author_list[1][0]529 pattern = re.compile('(?!<ref[^>]*?>)(' + in_text_citation + ')(?![^<]*?</ref>)', re.IGNORECASE)530 main_part = xml_string.split('<listBibl>', 1)[0]531 reference_part = xml_string.split('<listBibl>', 1)[1]532 xml_string = pattern.sub('<ref target="#' + reference_id + '">\\1</ref>', main_part) + '<listBibl>' + reference_part533 # outfile = open("file.txt", 'w', encoding='utf-8')534 # outfile.write(xml_string)535 # outfile.close()536 root = etree.fromstring(str.encode(xml_string))537 if tei_tools.paper_alphanumeric_citation_style(root):538 result = parse_numeric_citation(row, CURRENT_LR, root)539 else:540 result = parse_standard_citation(row, CURRENT_LR, root)541 if result.empty:542 emptyvalues = [row['citation_key_lr'],543 row['citation_key_cp'],544 '', '', '', '', '', '', '', '', '', '',545 '', '', '', '', '', '', '', '', '', '',546 '', '', '', '', '', '', '', '', '', '',547 '', '', '']548 df = pd.DataFrame(columns=columnnames)549 df.loc[0] = emptyvalues550 return(df)551 else:552 return(result)553def collect_result(result):554 global CITATION555 CITATION = pd.concat([CITATION, result])556if __name__ == "__main__":557 ARTICLE = pd.read_csv(data_dir + 'ARTICLE.csv')558 LR_CP = pd.read_csv(data_dir + 'LR_CP.csv')559 LR_CP = pd.merge(LR_CP, ARTICLE, left_on='citation_key_lr', right_on='citation_key')560 LR_CP = LR_CP[['citation_key_lr', 'citation_key_cp', 'title', 'author', 'year']]561 LR_CP.columns = ['citation_key_lr', 'citation_key_cp', 'title_lr', 'author_lr', 'year_lr']562 LR_CP = pd.merge(LR_CP, ARTICLE, left_on='citation_key_cp', right_on='citation_key')563 LR_CP = LR_CP[['citation_key_lr', 'citation_key_cp', 'title_lr', 'author_lr', 'year_lr', 'journal']]564 LR_CP.columns = ['citation_key_lr', 'citation_key_cp', 'title_lr', 'author_lr', 'year_lr', 'journal_cp']565 columnnames = ['citation_key_lr',566 'citation_key_cp',567 'citation_sentence',568 'predecessor',569 'successor',570 'textual',571 'separate',572 'sentence_popularity',573 'context_popularity',574 'sentence_density',575 'context_density',576 'position_in_sentence',577 'sentence_neg',578 'sentence_neu',579 'sentence_pos',580 'sentence_compound',581 'context_neg',582 'context_neu',583 'context_pos',584 'context_compound',585 'comp_sup',586 'prp',587 'pos_pattern',588 'pos_0',589 'pos_1',590 'pos_2',591 'pos_3',592 'pos_4',593 'pos_5',594 'position_in_document',595 'heading_title',596 'heading_category',597 'ref_in_figure_description',598 'ref_in_table_description',599 'ref_in_heading']600 CITATION = pd.DataFrame(columns=columnnames)601 pool = mp.Pool(mp.cpu_count()-2)602 for i, row in LR_CP.iterrows():603 pool.apply_async(parse_citation, args=(row, ), callback=collect_result)604 pool.close()605 pool.join()606 607 CITATION = CITATION.drop_duplicates()608 CITATION = CITATION.sort_values(['citation_key_lr', 'citation_key_cp'])...
background.py
Source:background.py
1from config.settings import DATA_DIR, ARTICLE_TYPE2import copy3import re4import numpy as np5import pandas as pd6import pprint7import textdistance8import os9import json10import collections11from utils.programs import Programs12pp = pprint.PrettyPrinter()13class Background:14 def find_university(self): raise NotImplementedError("Override me")15 def find_major(self): raise NotImplementedError("Override me")16class TWBackground(Background):17 def __init__(self):18 # TW universities19 self.universities = pd.read_csv(os.path.join(DATA_DIR, 'tw/tw_universities.csv'), sep='|', index_col='uni_id')20 self.universities['ip'] = self.universities['ip'].map(lambda x: str(int(x)) if not pd.isnull(x) else None)21 self.universities = self.universities.where(self.universities.notnull(), None)22 self.uid2cname = self.universities.to_dict()['uni_cname']23 self.cabbr2uid = {cabbr: uid for cabbr, uid in zip(self.universities['uni_cabbr'], self.universities.index) if cabbr is not None}24 self.cname2uid = {cname: uid for cname, uid in zip(self.universities['uni_cname'], self.universities.index) if cname is not None}25 self.name2uid = {name: uid for name, uid in zip(self.universities['uni_name'], self.universities.index) if name is not None}26 self.ip2uid = {str(int(ip)): uid for ip, uid in zip(self.universities['ip'], self.universities.index) if ip is not None}27 # Majors28 self.majors = pd.read_csv(os.path.join(DATA_DIR, 'tw/majors.csv'), sep=',', index_col='major_id', na_values=None)29 self.mid2name = self.majors.to_dict()['major_cname']30 self.cabbr2mid = {cabbr: mid for cabbr, mid in zip(self.majors['major_cabbr'], self.majors.index)}31 self.cname2mid = {cname: mid for cname, mid in zip(self.majors['major_cname'], self.majors.index)}32 self.name2mid = {name.upper(): mid for name, mid in zip(self.majors['major_name'], self.majors.index)}33 self.mid2mtype = {mid: mtype for mtype, mid in zip(self.majors['major_type'], self.majors.index)}34 # Background keywords35 self.background_keywords = ('background', 'education', 'ç¶æ·', 'å¸æ·', 'academic record')36 self.gpa_keywords = ('GPA', 'Rank', ' Education', 'Background')37 self.debug_id = None38 def find_university(self, content, aid=None):39 def helper(matched_word=None, university_row_index=None, uni_id=None, background_row_idx=None):40 """Helper function to return the result as a json object"""41 return locals()42 content = copy.deepcopy(content)43 rows = content.split('\n')44 # We try to find the "Background" keywords to identify the university45 background_row_idx = None46 for idx, row in enumerate(rows):47 s = re.search(r'(' + '|'.join(self.background_keywords) + ')', row, flags=re.IGNORECASE)48 if s is not None:49 background_row_idx = idx50 break51 search_range = [i for i in range(0, len(rows))]52 # Rotate the array so we start the search from the background section53 if background_row_idx is not None:54 search_range = search_range[background_row_idx:] + search_range[:background_row_idx]55 # Search row by row in search range56 for ridx in search_range:57 row = rows[ridx]58 uni, word = self.sentence2university(row)59 if uni:60 return helper(word, ridx, uni, background_row_idx)61 return None62 def sentence2university(self, sentence):63 ntu_siblings = ('NTUT', 'NTUST')64 for word in sentence.strip().split():65 # Exact match of university chinese name66 if word in self.cname2uid:67 return self.cname2uid[word], word68 # Exact match of university chinese abbreviation69 elif word in self.cabbr2uid:70 return self.cabbr2uid[word], word71 # NTU special cases72 elif ('NTU' in word and all([x not in word for x in ntu_siblings])) or 'å°ç£å¤§å¸' in word or 'èºç£å¤§å¸' in word:73 return 'NTU', word74 # Exact match uid75 elif word.upper() in self.uid2cname:76 return word.upper(), word77 elif word in self.ip2uid:78 return self.ip2uid[word], word79 else:80 # uid in word (e.g. 'NTU' in 'NTUEE')81 ruid = re.findall(r'(' + '|'.join(self.uid2cname.keys()) + ')(?!.)', word)82 # Filter False positive Hsinchu -> NCHU83 if ruid and word != 'Hsinchu':84 return ruid[0].upper(), word85 # Chinese abbr. in word (e.g. 'å°å¤§' in 'å°å¤§é»æ©')86 rabbr = re.findall(r'(' + '|'.join(self.cabbr2uid.keys()) + ')', word)87 if rabbr:88 return self.cabbr2uid[rabbr[0]], word89 # Check if university English name in row90 for name in self.name2uid:91 if name in sentence:92 return self.name2uid[name], word93 return None, None94 def find_major(self, content, university, aid=None):95 if aid == self.debug_id:96 print(aid)97 content = copy.deepcopy(content)98 rows = content.split('\n')99 # Define the range of rows we are going to search, where we usually start from the the "background_row_idx"100 start_row_index = university['background_row_idx'] if university is not None and university['background_row_idx'] is not None else 0101 end_row_index = min(len(rows), university['university_row_index'] + 4) if university is not None else len(rows)102 search_range = list(range(start_row_index, end_row_index))103 # We search the "university_row_index" row first, e.g. NTU EE104 if university is not None:105 search_range = [university['university_row_index']] + search_range106 # Search row by row in search range107 for ridx in search_range:108 row = rows[ridx]109 major = self.sentence2major(row, university)110 if major:111 return major112 return None113 def sentence2major(self, sentence, university=None, from_api=False):114 sentence = re.sub(r'(student|TOEFL|GRE)', ' ', sentence, flags=re.IGNORECASE)115 # We now determine the start idx we parse from the row!116 # 1) Major is often listed after/before university, check if we are at the same row117 start_idx = 0118 if university is not None and university['matched_word'] in sentence:119 start_idx = max(sentence.index(university['matched_word']) - 10, 0)120 # 2) Major is often listed after the background keywords, check if the keyword exists121 s = re.search(r'(' + '|'.join(self.background_keywords) + ')', sentence, re.IGNORECASE)122 # 3) Set the start index123 if s is not None:124 start_idx = min(start_idx, s.end())125 # Search after the start_idx (e.g. After university or background)126 sentence = sentence[start_idx:]127 sentence = re.sub(r'[.,:;/()]', ' ', sentence)128 sentence = sentence.upper()129 # Check if major English name in row130 for name in self.name2mid:131 if name in sentence:132 return self.name2mid[name]133 for word in sentence.strip().split():134 # Exact match of major chinese name135 if word in self.cname2mid:136 return self.cname2mid[word]137 # Exact match of major chinese abbreviation, exclude false positive 'é¦æ¸¯ä¸æ大å¸'138 elif word in self.cabbr2mid and 'ä¸æ大å¸' not in word:139 return self.cabbr2mid[word]140 # Exact match mid, and word != 'BA' (Bachelor's of Art)141 elif word.upper() in self.mid2name and word.upper() != 'BA':142 return word.upper()143 else:144 # mid in word (e.g. 'EE' in 'NTUEE')145 rmid = re.findall(r'(' + '|'.join(self.mid2name.keys()) + ')(?!.)', word)146 # Filter False positive ENT (Entomology) and word != 'BA' (Bachelor's of Art)147 if rmid and (rmid[0] != 'ENT' or re.match(r' ENT', word)) and rmid[0] != 'BA'\148 and (rmid[0] != 'ARCH' or 'RESEARCH' not in word.upper()):149 return rmid[0].upper()150 # cabbr in word (e.g. 'é»æ©' in 'å°å¤§é»æ©ç³»')151 rabbr = re.findall(r'(' + '|'.join(self.cabbr2mid.keys()) + ')', word, re.IGNORECASE)152 if rabbr and 'ä¸æ大å¸' not in word:153 return self.cabbr2mid[rabbr[0]]154 # Another corner case where the major id is BA from the API request155 if from_api and sentence == 'BA':156 return 'BA'157 return None158 def find_gpa(self, content, university, aid=None):159 content = copy.deepcopy(content)160 rows = content.split('\n')161 gpa_scale = -1162 gpa_keyword_in_row_idx = None163 background_row_idx = university['background_row_idx'] if university is not None else None164 candidates = []165 for idx, row in enumerate(rows):166 # Check if GPA and GRE keyword in row167 gpa_keyword_in_row = re.search(r'(' + '|'.join(self.gpa_keywords) + ')', row, re.IGNORECASE)168 if gpa_keyword_in_row:169 gpa_keyword_in_row_idx = idx170 # See if GRE is in row171 gre_in_row = re.search(r'(GRE|G:|G |AW|V1|Q1|V 1|Q 1|V:|Q:)', row, re.IGNORECASE)172 # If GRE and GPA co-occur in the same row, remove the GRE part173 if gre_in_row and gpa_keyword_in_row:174 if gre_in_row.start() > gpa_keyword_in_row.end():175 row = row[:gre_in_row.start()]176 elif gre_in_row.end() < gpa_keyword_in_row.start():177 row = row[gpa_keyword_in_row.start():]178 # Get AW index if exists179 # aw_idx = row.index('AW') if 'AW' in row else aw_idx180 # Parse the float numbers in the current row through regex181 year_regex = r'[2][0-9]{3}'182 row = re.sub(year_regex, ' ', row)183 float_numbers = re.finditer(r'\d+\.\d+', row)184 # Only search rows that are "GPA_keyword" rows185 if gpa_keyword_in_row is not None or (gpa_keyword_in_row_idx is not None and idx - gpa_keyword_in_row_idx <= 1):186 for m in float_numbers:187 num = float(row[m.start(0): m.end(0)])188 # Skip AW (e.g. AW 3.5) to avoid "Fake" GPA results189 if num in np.arange(1, 6.5, 0.5) and gre_in_row:190 continue191 # We expect the GPA number be in the range (0, 4.3)192 if num < 0.001 or num > 4.31:193 continue194 # Ugly but efficient way to get the GPA scale...195 if np.isclose(num, 4.0) and ('/4.0' in row or '/ 4.0' in row):196 gpa_scale = 4.0197 elif np.isclose(num, 4.3) and ('/4.3' in row or '/ 4.3' in row):198 gpa_scale = 4.3199 else:200 candidates.append(num)201 # Don't forget that people are just too good!202 if '4.3/' in row or '4.3 /' in row:203 candidates.append(4.3)204 elif '4.0/' in row or '4.0 /' in row:205 candidates.append(4.0)206 # We stop searching if we are too far away from background section207 if background_row_idx is not None and idx - university['background_row_idx'] > 20:208 break209 # Return parsed GPA210 if len(candidates) > 0:211 candidates.sort()212 return {'max_gpa': np.max(candidates), 'min_gpa': np.min(candidates),213 'mean_gpa': np.round(candidates[len(candidates) // 2], 2), 'gpa_scale': gpa_scale}214 else:215 return {'max_gpa': -1, 'min_gpa': -1, 'mean_gpa': -1, 'gpa_scale': -1}216class USBackground(Background):217 def __init__(self):218 self.ad_reg = r'(admit|admission|admision|accept|appected|ad |ad:|offer|éå)'219 self.rej_reg = r'(reject|rejection|rejection:|rej|rej:|æçµ|ææ§)'220 self.pending_reg = r'(pending|waitlist|wl |wl:|ç¡è²|ç¡æ¶æ¯)'221 self.useless_reg = r'w\/|w\/o|funding|without|with|stipend|tuition|waived|waive|waiver|fellowship| RA|email|e-mail|year|month|date|interviewed|\222 decision|semester|first|for | per| technical|nomination| by | out|\(|\)|Research|Interest|Area|Field|Politics'223 self.ascii_reg = r'[^\x00-\x7F]+'224 self.debug_id = None225 # Load Universities226 with open(os.path.join(DATA_DIR, 'us/us_universities_top.json'), 'r') as f:227 self.us_universities = json.load(f)228 # Init a set of all university names229 self.all_uni_names = set(self.us_universities['top_100_names'] + self.us_universities['other_uni_names'])230 # Setup university name to Uid mapping231 self.uname2uid = collections.defaultdict(list)232 for uid in self.us_universities['top_100_uid']:233 self.uname2uid[self.us_universities['top_100_uid'][uid]].append(uid)234 for uid in self.us_universities['other_uni_uid']:235 self.uname2uid[self.us_universities['other_uni_uid'][uid]].append(uid)236 # Init Programs instance237 self.programs = Programs()238 def normalize_university_name(self, words):239 if words.startswith('U '):240 words = words.replace('U ', 'University of ')241 words = words.replace('U. ', 'University of ') if 'of' not in words else words.replace('U. ', 'University ')242 words = words.replace('U of ', 'University of ')243 words = words.replace('Univ ', 'University')244 words = words.replace('UC-', 'UC ')245 words = words.replace('University of California,', 'University of California ')246 r = re.search(r'\w*State U\b', words)247 if r:248 words = words[: r.start()] + 'State University' + words[r.end():]249 r = re.search(r'\w*Univ.\b', words, flags=re.IGNORECASE)250 if r:251 words = words[: r.start()] + 'University' + words[r.end():]252 # Purify some random words:253 r = r'no|yr|ta|ra|ms'254 if len(words) == 2 and re.search(r, words, flags=re.IGNORECASE):255 words = ''256 return words257 def search_single_university_name(self, ad_row):258 for uname in self.us_universities['top_100_names']:259 if re.search(uname, ad_row, flags=re.IGNORECASE):260 return uname261 ad_row = ' ' + ad_row + ' '262 ad_row_upper = ad_row.upper()263 for uid in self.us_universities['top_100_uid']:264 uid_token = ' ' + uid + ' '265 if uid_token in (ad_row_upper, ad_row):266 return self.us_universities['top_100_uid'][uid]267 ad_row = ad_row.strip()268 for uname in self.us_universities['other_uni_names']:269 if re.search(uname, ad_row, flags=re.IGNORECASE):270 return uname271 # Search for university fullnames with high LCS similarity272 # The fullname should be at least 10 characters273 if len(ad_row) >= 10:274 td_names = []275 for uname in self.us_universities['top_100_names']:276 td = textdistance.lcsseq.similarity(uname, ad_row) / min(len(ad_row), len(uname))277 if td > 0.75:278 td_names.append((td, uname))279 if td_names:280 return max(td_names)[1]281 for uid in self.us_universities['other_uni_uid']:282 if re.search(r'(?:^|(?<= ))(' + uid + ')(?:(?= )|$)', ad_row):283 return self.us_universities['other_uni_uid'][uid]284 return None285 def search_all_university_names(self, article_title):286 result = []287 for uname in self.us_universities['top_100_names']:288 if re.search(uname, article_title, flags=re.IGNORECASE):289 result.append(uname)290 article_title = ' ' + article_title + ' '291 for uid in self.us_universities['top_100_uid']:292 if ' ' + uid + ' ' in article_title:293 result.append(self.us_universities['top_100_uid'][uid])294 for uname in self.us_universities['other_uni_names']:295 if re.search(uname, article_title, flags=re.IGNORECASE):296 result.append(uname)297 for uid in self.us_universities['other_uni_uid']:298 if ' ' + uid + ' ' in article_title:299 result.append(self.us_universities['other_uni_uid'][uid])300 article_title = article_title.strip()301 if 'Cornell Tech' in result and 'Cornell University' in result:302 result.remove('Cornell University')303 return result304 def parse_admission_section(self, articles):305 def helper_get_end_idx_and_reg(rej_idx, pending_idx):306 """307 Given indices for reject and pending rows,308 return the right one as the ending index309 Returns310 -------311 (int, regex)312 Return a tuple of index and specified regex313 """314 if rej_idx is None and pending_idx is None:315 return None, None316 elif rej_idx is not None and pending_idx is None:317 return rej_idx, self.rej_reg318 elif rej_idx is None and pending_idx is not None:319 return pending_idx, self.pending_reg320 else:321 return (rej_idx, self.rej_reg) if rej_idx <= pending_idx else (pending_idx, self.pending_reg)322 ad_count = 0323 result = []324 for article in articles:325 if self.debug_id and article['article_id'] != self.debug_id:326 continue327 # Parse AD programs from title328 article_title = article['article_title'].replace('[éå]', '')329 article_title = re.sub(self.ascii_reg, ' ', article_title)330 article_title = re.sub(self.useless_reg, ' ', article_title, flags=re.IGNORECASE)331 ad_title = re.split(r'[:;/(),\[\]]', article_title)332 ad_title = [r.strip() for r in ad_title if len(r.strip()) > 1]333 # Parse AD section from content334 content = article['content']335 rows = copy.deepcopy(content.split('\n'))336 ad_idx = None337 rej_idx = None338 pending_idx = None339 # Find the index for "ADMISSION", "REJECT" and "PENDING" rows340 for ridx, row in enumerate(rows):341 if re.search(self.ad_reg, row, flags=re.IGNORECASE) and (342 (rej_idx is None or ridx <= rej_idx) and (pending_idx is None or ridx <= pending_idx)):343 ad_idx = ridx344 if re.search(self.rej_reg, row, flags=re.IGNORECASE) and (rej_idx is None or (345 ad_idx is not None and rej_idx <= ad_idx and ridx <= ad_idx + 4)):346 rej_idx = ridx347 if re.search(self.pending_reg, row, flags=re.IGNORECASE) and (pending_idx is None or (348 ad_idx is not None and pending_idx <= ad_idx and ridx <= ad_idx + 4)):349 pending_idx = ridx350 # Replace non ASCII characters with 'blank'351 rows = [re.sub(self.ascii_reg, ' ', row) for row in rows]352 if article['article_id'] == self.debug_id:353 print('parsed index', ad_idx, rej_idx, pending_idx)354 ad_list = []355 end_idx, end_reg = helper_get_end_idx_and_reg(rej_idx, pending_idx)356 if ad_idx is not None and end_idx is not None:357 break_flag = False358 for idx in range(ad_idx, end_idx + 1):359 row = rows[idx]360 # Scrap "Admission:" from the row361 ad_match = re.search(self.ad_reg, row, flags=re.IGNORECASE)362 if ad_match:363 row = row[:ad_match.start()] + row[ad_match.end():]364 # Scrap "Reject:" or "Pending:" from the row, and break after this row365 end_match = re.search(end_reg, row, flags=re.IGNORECASE)366 if end_match:367 row = row[:end_match.start()]368 break_flag = True369 # Remove date370 date_reg = re.findall(r'\d+\/\d+', row)371 for date in date_reg:372 row = row.replace(date, ' ')373 # Remove useless stuff, eg. w or w/o funding374 row = re.sub(self.useless_reg, ' ', row, flags=re.IGNORECASE)375 # If there is only one comma, it is most likely the row only376 # contains one university, e.g. 'MIT, EECS'377 if row.count(',') <= 2:378 row = row.replace(',', ' ')379 # Split programs! e.g. 'MIT / CMU -> ['MIT', 'CMU']380 row = re.split(r'[:;,/\[\]]', row)381 # Keep rows with length > 1382 row = [r.strip() for r in row if len(r.strip()) > 1]383 ad_list.extend(row)384 # Break if we reach the end (reject/pending row)385 if break_flag:386 break387 # Count how many aritlces with AD successively parsed388 if len(ad_list) > 0:389 ad_count += 1390 result.append({'article_id': article['article_id'], 'article_title': article['article_title'],391 'url': article['url'], 'admission_title': ad_title, 'admission': ad_list})392 print(f'Found {ad_count} articles with admission section')393 return result394 def find_university(self, ad_results, articles=None, update=True):395 def hash_program_uni_pair(x):396 a = x['program_level'] if x['program_level'] else ''397 b = x['program_name'] if x['program_name'] else ''398 c = x['university'] if x['university'] else ''399 return a + '@' + b + '@' + c400 result = []401 debug_ads = []402 # Iterate the raw ad_results403 for idx, article in enumerate(ad_results):404 if self.debug_id and article['article_id'] != self.debug_id:405 continue406 # Parse university and programs from admission sections407 parsed_admission_results = []408 d1 = []409 parsed_program_uni_pairs = []410 parsed_program_names = []411 parsed_program_levels = []412 debug_rows = []413 parsed_uni_pair_set = set()414 for i, row in enumerate(article['admission']):415 row = self.normalize_university_name(row)416 debug_rows.append(row)417 if not row:418 continue419 # Parse program from this row420 (program_level, program_name), row_new = self.programs.search_program(row, aid=article['url'])421 if program_level is not None:422 parsed_program_levels.append(program_level)423 if program_name is not None:424 parsed_program_names.append(program_name)425 # No university left to search in row426 if len(row_new) == 0:427 continue428 # Find university in article admission section429 uni_match = self.search_single_university_name(row_new)430 # print('Norm', row, '@', uni_match, '@', program_level, program_name)431 # If we found a university, add to `parsed_admission_results`432 if uni_match is not None:433 # Map parsed results to uni names434 parsed_admission_results.append(uni_match)435 d1.append((row, uni_match))436 else:437 # parsed_admission_results.append(None)438 d1.append((row, ''))439 if (program_name or program_level) and uni_match:440 parsed_program_uni_pairs.append(441 {442 'program_level': program_level,443 'program_name': program_name,444 'university': uni_match445 }446 )447 parsed_uni_pair_set.add(uni_match)448 parsed_admission_title_results = []449 parsed_program_names_from_title = []450 parsed_program_levels_from_title = []451 d2 = []452 # If we passed articles into the function, we try to parse the article title453 if articles is not None:454 for ad_title in article['admission_title']:455 # Parse program from title456 (program_level, program_name), ad_title_new = self.programs.search_program(ad_title)457 if program_level is not None:458 parsed_program_levels_from_title.append(program_level)459 if program_name is not None:460 parsed_program_names_from_title.append(program_name)461 ad_title_new = self.normalize_university_name(ad_title_new)462 if not ad_title_new:463 continue464 # Find university in article title465 uni_matches = self.search_all_university_names(ad_title_new)466 # If we found a university, add to `parsed_admission_title_results`467 if uni_matches:468 parsed_admission_title_results.extend(uni_matches)469 d2.append((ad_title, uni_matches))470 else:471 d2.append((ad_title, ''))472 # Combine admission results from "title" + "section"473 parsed_admission_results.extend(parsed_admission_title_results)474 parsed_admission_universities = list(set(parsed_admission_results))475 # Fill in program name and levels if not found in `parsed_program_uni_pairs` but in title476 if parsed_program_levels_from_title or parsed_program_levels:477 program_level = parsed_program_levels_from_title[0] if parsed_program_levels_from_title else parsed_program_levels[0]478 for pair in parsed_program_uni_pairs:479 if pair['program_level'] is None:480 pair['program_level'] = program_level481 if parsed_program_names_from_title or parsed_program_names:482 program_name = parsed_program_names_from_title[0] if parsed_program_names_from_title else parsed_program_names[0]483 for pair in parsed_program_uni_pairs:484 if pair['program_name'] is None:485 pair['program_name'] = program_name486 # Hash parsed_program_uni_pairs to set to prevent duplicate parsed_program_uni_pairs487 uni_pairs_set = set()488 for pair in parsed_program_uni_pairs:489 uni_pairs_set.add(hash_program_uni_pair(pair))490 # Fill in Universities with no program level or program name associated491 universities_without_programs = set(parsed_admission_universities) - parsed_uni_pair_set492 for uni in universities_without_programs:493 # Fill in from title494 program_level = parsed_program_levels_from_title[0] if parsed_program_levels_from_title else None495 program_name = parsed_program_names_from_title[0] if parsed_program_names_from_title else None496 # No program level from title, try to fill in from admission results497 program_level = parsed_program_levels[0] if not program_level and parsed_program_levels else program_level498 program_name = parsed_program_names[0] if not program_name and parsed_program_names else program_name499 uni_pair = {500 'program_level': program_level,501 'program_name': program_name,502 'university': uni503 }504 if hash_program_uni_pair(uni_pair) not in uni_pairs_set:505 uni_pairs_set.add(hash_program_uni_pair(uni_pair))506 parsed_program_uni_pairs.append(uni_pair)507 # Merge program levels / names from title508 parsed_program_levels.extend(parsed_program_levels_from_title)509 parsed_program_names.extend(parsed_program_names_from_title)510 # Append universities/programs to result511 result.append({512 'admission_universities': parsed_admission_universities,513 'program_levels': list(set(parsed_program_levels)),514 'program_names': list(set(parsed_program_names)),515 'program_uni_pairs': parsed_program_uni_pairs516 })517 # For debug purpose518 """519 debug_ads.append({520 'article_title': articles[idx]['article_title'], 'url': articles[idx]['url'],521 'program_levels': list(set(parsed_program_levels)),522 'program_names': list(set(parsed_program_names)),523 'program_title_levels': parsed_program_levels_from_title,524 'program_title_names': parsed_program_names_from_title,525 'program_uni_pairs': parsed_program_uni_pairs,526 'debug_rows': debug_rows527 })528 """529 # For debug purpose530 # with open(os.path.join(DATA_DIR, 'debug_ad.json'), 'w') as target:531 # json.dump(debug_ads, target, indent=2, ensure_ascii=False)532 print(f'Parsed {len(result)} admission articles')533 return result534 def map_university_token_to_fullname(self, uni):535 # Deprecated for now536 if uni in self.all_uni_names:537 return uni538 elif uni in self.us_universities['top_100_uid']:539 return self.us_universities['top_100_uid'][uni]540 elif uni in self.us_universities['other_uni_uid']:541 return self.us_universities['other_uni_uid'][uni]...
import_team.py
Source:import_team.py
1'''2@author: Dallas Fraser3@date: 2016-04-124@organization: MLSB API5@summary: Holds a class TeamList that helps imports a team roster6'''7# imports8from sqlalchemy import func, or_9from api.model import Sponsor, Team, Player, League10from api import DB11from datetime import date12from api.errors import InvalidField, SponsorDoesNotExist, LeagueDoesNotExist13import logging14# constants15MISSING_BACKGROUND = "Missing background: {}"16LEFT_BACKGROUND_EXAMPLE = "Background example was left: {}"17LEFT_PLAYER_EXAMPLE = "Player example was left: {}"18INVALID_SPONSOR = "Sponsor given was not found: {}"19INVALID_PLAYER = "Player given {} had the following issue: {}"20INVALID_LEAGUE = "League given was not found: {}"21PLAYER_MISMATCH_COLUMNS = "Player mismatched the headers: {}"22INVALID_ROW = "Unsure what to do with the following row: {}"23PLAYER_ROW_IDENTIFIER = "player"24CAPTAIN_NOT_ASSIGNED = "Captain was not assigned"25# a dictionary of the headers needed with their keys26# and how they appear in csv27HEADERS = {"name": "Player Name",28 "email": "Player Email",29 "gender": "Gender (M/F)"}30# a dictionary of the background needed with their keys31# and how they appear in csv32BACKGROUND = {"sponsor_name": "sponsor",33 "team_color": "color",34 "captain_name": "captain",35 "league_name": "league"}36class TeamList():37 def __init__(self, lines, logger=None, session=None):38 """The constructor39 lines: a list of lines parsed from csv40 logger: a logger41 session: a mocked database session42 """43 self.success = False44 self.errors = []45 self.warnings = []46 self.lines = lines47 if logger is None:48 logging.basicConfig(level=logging.INFO,49 format='%(asctime)s %(message)s')50 logger = logging.getLogger(__name__)51 self.logger = logger52 self.team = None53 self.captain_name = None54 self.captain = None55 self.name_index = None56 self.email_index = None57 self.gender_index = None58 self.session = session59 if session is None:60 self.session = DB.session61 def add_team_functional(self):62 """ Add a team to the database using functions instead of methods"""63 # parse out the parts - background, header, players64 parts = parse_lines(self.lines)65 self.warnings = parts['warnings']66 # extract the background such a league, sponsor and color67 background = extract_background(parts['background'])68 # extract the players using the header as lookup69 lookup = extract_column_indices_lookup(parts['header'])70 players = extract_players(parts["players"], lookup)71 self.warnings = self.warnings + players['warnings']72 # add the players73 player_models = []74 for player_json in players['player_info']:75 try:76 if (player_json['player_id'] is None):77 # need to create the player78 player = Player(player_json['name'],79 player_json['email'],80 gender=player_json["gender"])81 self.session.add(player)82 self.session.commit()83 else:84 email = player_json['email']85 player = Player.query.filter(func.lower(Player.email) ==86 func.lower(email)).first()87 player_models.append(player.json())88 except Exception as error:89 player_info = "-".join([player_json["name"],90 player_json["email"]])91 self.warnings.append(INVALID_PLAYER.format(player_info,92 str(error)))93 # get the team, create if does not exist94 if background['team']['team_id'] is None:95 team = Team(color=background['team']['color'],96 sponsor_id=background['sponsor']['sponsor_id'],97 league_id=background['league']['league_id'],98 year=date.today().year)99 self.session.add(team)100 else:101 # get the team and remove all players102 team = Team.query.get(background['team']['team_id'])103 team.players = []104 set_captain = False105 for player in player_models:106 if (player["player_name"].lower()107 == background["captain"]["player_name"].lower()):108 set_captain = True109 team.insert_player(player["player_id"], captain=True)110 else:111 team.insert_player(player["player_id"], captain=False)112 if not set_captain:113 self.warnings.append(CAPTAIN_NOT_ASSIGNED)114 self.session.commit()115def extract_background(background):116 """Returns a dictionary of the extracted json objects from the background.117 Parameters:118 background: dictionary of sponsor, color, captain, league119 Returns:120 a dictionary of sponsor model, team model, player model, league model121 """122 for value in BACKGROUND.values():123 if value not in background.keys():124 errorMessage = MISSING_BACKGROUND.format(value)125 raise InvalidField(payload={"details": errorMessage})126 league_name = background['league']127 sponsor_name = background['sponsor']128 team_color = background['color']129 captain_name = background['captain']130 if league_name.lower().startswith("ex."):131 error_message = LEFT_BACKGROUND_EXAMPLE.format(league_name)132 raise InvalidField(payload={"details": error_message})133 elif sponsor_name.lower().startswith("ex."):134 error_message = LEFT_BACKGROUND_EXAMPLE.format(sponsor_name)135 raise InvalidField(payload={"details": error_message})136 elif team_color.lower().startswith("ex."):137 error_message = LEFT_BACKGROUND_EXAMPLE.format(team_color)138 raise InvalidField(payload={"details": error_message})139 elif captain_name.lower().startswith("ex."):140 error_message = LEFT_BACKGROUND_EXAMPLE.format(captain_name)141 raise InvalidField(payload={"details": error_message})142 # nothing to do with the captain at this point143 captain = {"player_name": captain_name}144 # try to find sponsor and league145 sponsor = (Sponsor.query.filter(or_(func.lower(Sponsor.name)146 == func.lower(sponsor_name)),147 func.lower(Sponsor.nickname)148 == func.lower(sponsor_name))149 ).first()150 league = League.query.filter(func.lower(League.name)151 == func.lower(league_name)).first()152 if sponsor is None:153 error_message = INVALID_SPONSOR.format(sponsor_name)154 raise SponsorDoesNotExist(payload={'details': error_message})155 if league is None:156 error_message = INVALID_LEAGUE.format(league_name)157 raise LeagueDoesNotExist(payload={'details': error_message})158 # check to see if team was already created159 teams = (Team.query160 .filter(func.lower(Team.color) == func.lower(team_color))161 .filter(Team.sponsor_id == sponsor.id)162 .filter(Team.year == date.today().year)).all()163 if len(teams) > 0:164 team = teams[0].json()165 else:166 team = {'team_id': None,167 "color": team_color,168 "sponsor_id": sponsor.id,169 "league_id": league.id,170 "captain": None,171 "year": date.today().year}172 return {"captain": captain,173 "team": team,174 "league": league.json(),175 "sponsor": sponsor.json()}176def extract_column_indices_lookup(header):177 """ Returns a dictionary used to lookup indices for various fields178 Parameters:179 header: the header array180 Returns:181 a dictionary {str(field): int(index)}182 """183 lookup = {}184 for i in range(0, len(header)):185 for key, value in HEADERS.items():186 if is_entry_a_header(key, value, header[i]):187 lookup[key.lower()] = i188 # ensure all headers were found189 for key in HEADERS.keys():190 if key not in lookup.keys():191 error_message = "{} header missing".format(key.lower())192 raise InvalidField(payload={'details': error_message})193 return lookup194def is_entry_a_header(key, value, entry):195 """Returns whether the given entry in the header is a expected header."""196 return (key.lower() in entry.lower()197 or value.lower() in entry.lower())198def extract_player_information(info, lookup):199 """Parse a player and return a json object200 Parameters:201 info: a list of information about player202 lookup: the lookup for what fields and their indices in the info list203 Return:204 a dictionary {'player_id': int,205 'name': str,206 'email': str,207 'gender': str}208 """209 player_json = {}210 for key, value in lookup.items():211 player_json[key] = info[value].strip()212 player_id = None213 player = Player.query.filter(func.lower(Player.email) ==214 func.lower(player_json['email'])).first()215 if player is not None:216 player_id = player.id217 player_json['player_id'] = player_id218 return player_json219def extract_players(players, lookup):220 """Extract the players and return a list of players in json format221 Parameters:222 players: a list of rows that contain player information223 lookup: the lookup for what fields and their indices in the players224 Return:225 a dictionary with players_info, warnings226 where227 players_info: an array of dictionary {'player_id': int,228 'name': str,229 'email': str,230 'gender': str}231 warnings: a list of warnings encountered232 """233 players_info = []234 warnings = []235 for info in players:236 if len(info) == len(lookup):237 player = extract_player_information(info, lookup)238 if player['name'].lower().startswith("ex."):239 warnings.append(LEFT_PLAYER_EXAMPLE.format(" ".join(info)))240 else:241 players_info.append(player)242 else:243 warnings.append(PLAYER_MISMATCH_COLUMNS.format(" ".join(info)))244 return {'player_info': players_info, 'warnings': warnings}245def clean_cell(cell):246 """Returns a clean cell"""247 return cell.strip().lower().replace(":", "")248def parse_lines(lines, delimiter=","):249 """Parses the lines and returns a tuple with the three parts250 Parameters:251 lines: a list of lines252 delimiter: the delimiter for the lines (default = ,)253 Returns:254 a dictionary with background, header, players, warnings where:255 background: dictionary of sponsor, color, captain, league256 header: the header row257 players: a list of player lines258 warnings: a list of lines that were not recognized259 """260 background = {}261 header = None262 players = []263 warnings = []264 headers_keywords = ([key.lower() for key in HEADERS.keys()]265 + [value.lower() for value in HEADERS.values()])266 background_keywords = ([key.lower() for key in BACKGROUND.keys()]267 + [value.lower() for value in BACKGROUND.values()])268 for line in lines:269 info = line.split(delimiter)270 if clean_cell(info[0]).lower() in background_keywords:271 background[clean_cell(info[0])] = info[1].strip()272 elif info[0].lower().strip() in headers_keywords:273 header = info274 elif len(info) >= len(HEADERS.keys()):275 players.append(info)276 else:277 warnings.append(INVALID_ROW.format(line))278 return {'background': background,279 'header': header,280 'players': players,...
import_league.py
Source:import_league.py
1'''2@author: Dallas Fraser3@date: 2016-04-124@organization: MLSB API5@summary: Holds a class LeagueList that helps imports a League (list of games)6'''7# imports8from sqlalchemy import func9from api.model import Sponsor, Game, League, Division10from api import DB11from api.errors import InvalidField, LeagueDoesNotExist, TeamDoesNotExist,\12 DivisionDoesNotExist13import logging14import datetime15# constants16MISSING_BACKGROUND = "Missing background: {}"17LEFT_BACKGROUND_EXAMPLE = "Background example was left: {}"18INVALID_TEAM = "{} is not a team in the league"19INVALID_ROW = "Unsure what to do with the following row: {}"20INVALID_LEAGUE = "League given was not found: {}"21INVALID_DIVISION = "Division given was not found: {}"22INVALID_GAME = "The game was invalid - {} with error {}"23TEAM_NOT_FOUND = "Did not find team {} - for row {}"24BACKGROUND = {"league": "League", "division": "Division"}25HEADERS = {"home": "Home Team",26 "away": "Away Team",27 "date": "Date",28 "time": "Time",29 "field": "Field"}30class LeagueList():31 def __init__(self,32 lines,33 year=datetime.datetime.now().year,34 logger=None,35 session=None):36 """A constructor37 lines: a list of lines from the csv38 year: the year the league was39 logger: a logger40 session: mock a database session41 """42 self.success = False43 self.errors = []44 self.warnings = []45 self.lines = lines46 if logger is None:47 logging.basicConfig(level=logging.INFO,48 format='%(asctime)s %(message)s')49 logger = logging.getLogger(__name__)50 self.logger = logger51 self.year = year52 self.session = session53 if session is None:54 self.session = DB.session55 def import_league_functional(self):56 """ Add a team to the database using functions instead of methods"""57 # parse out the parts - background, header, players58 parts = parse_parts(self.lines)59 self.warnings = parts['warnings']60 # extract the background such a league, sponsor and color61 background = extract_background(parts['background'])62 league = background["league"]63 division = background["division"]64 # extract the players using the header as lookup65 lookup = extract_column_indices_lookup(parts['header'])66 # get the team map67 team_lookup = get_team_lookup(league)68 # extract the games69 games = extract_games(parts["games"], team_lookup, lookup)70 self.warnings = self.warnings + games['warnings']71 # add the players72 for game_json in games['games']:73 try:74 game = Game(game_json["date"],75 game_json["time"],76 game_json["home_team_id"],77 game_json["away_team_id"],78 league["league_id"],79 division["division_id"],80 field=game_json["field"])81 self.session.add(game)82 except Exception as error:83 game_list = [str(value) for value in game_json.values()]84 game_info = "-".join(game_list)85 self.warnings.append(INVALID_GAME.format(game_info,86 str(error)))87 self.session.commit()88def get_team_lookup(league, year=datetime.datetime.today().year):89 '''90 a method that sets the teams for the league91 Parameters:92 league: the json league object93 year: the year we are importing for94 Returns:95 teams: a dictionary object lookup for teams96 '''97 teams = {}98 league = League.query.get(league["league_id"])99 if league is None:100 raise LeagueDoesNotExist(payload={'details': league})101 for team in league.teams:102 if team.year == year:103 teams[str(team)] = team.id104 sponsor = str(Sponsor.query.get(team.sponsor_id))105 teams[sponsor + " " + team.color] = team.id106 return teams107def extract_column_indices_lookup(header):108 """ Returns a dictionary used to lookup indices for various fields109 Parameters:110 header: the header array111 Returns:112 a dictionary {str(field): int(index)}113 """114 lookup = {}115 for i in range(0, len(header)):116 for key, value in HEADERS.items():117 if is_entry_a_header(key, value, header[i]):118 lookup[key.lower()] = i119 # ensure all headers were found120 for key in HEADERS.keys():121 if key not in lookup.keys():122 error_message = "{} header missing".format(key.lower())123 raise InvalidField(payload={'details': error_message})124 return lookup125def is_entry_a_header(key, value, entry):126 """Returns whether the given entry in the header is a expected header."""127 return (key.lower() in entry.lower() or128 value.lower() in entry.lower())129def is_game_row_valid(game, lookup):130 """Returns whether all columns can be found in the game entry.131 Parameters:132 game: the entry for the game133 lookup: a lookup for fields to indexes in columns134 Returns:135 true if valid row otherwise False136 """137 for index in lookup.values():138 if index > len(game):139 return False140 return True141def extract_game(game, team_lookup, lookup):142 """Returns a game json object143 Parameters:144 game: the entry for the game145 team_lookup: a lookup for team names to player ids146 lookup: a lookup for fields to indexes in columns147 Returns:148 a json game object, None if game data not found149 """150 if not is_game_row_valid(game, lookup):151 return None152 away = game[lookup["away"]].strip()153 home = game[lookup["home"]].strip()154 time = game[lookup["time"]].strip()155 field = game[lookup["field"]].strip()156 date = game[lookup["date"]].strip()157 # check if variables meet certain conditions158 # else should be good to add game159 away_team = team_lookup.get(away, None)160 home_team = team_lookup.get(home, None)161 if away_team is None:162 error_message = INVALID_TEAM.format(away_team)163 raise TeamDoesNotExist(payload={'details': error_message})164 if home_team is None:165 error_message = INVALID_TEAM.format(home_team)166 raise TeamDoesNotExist(payload={'details': error_message})167 return {"away_team_id": away_team,168 "home_team_id": home_team,169 "time": time,170 "field": field,171 "date": date}172def extract_games(games, team_lookup, lookup):173 """Returns a dictionary with list of games and warnings174 Parameters:175 games: the games entry rows176 team_lookup: a lookup for team names to the team ids177 lookup: a lookup for column indices178 Returns:179 a dictionary with a list of games and a list of warnings180 """181 result = []182 warnings = []183 for game in games:184 try:185 game = extract_game(game, team_lookup, lookup)186 if game is not None:187 result.append(game)188 except TeamDoesNotExist as e:189 warnings.append(TEAM_NOT_FOUND.format(str(e), ",".join(game)))190 return {"games": result, "warnings": warnings}191def extract_background(background):192 """Returns a dictionary of the extracted json objects from the background.193 Parameters:194 background: dictionary of sponsor, color, captain, league195 Returns:196 a dictionary of league model197 """198 background_keys = [key.lower() for key in background.keys()]199 for value in BACKGROUND.values():200 if value.lower() not in background_keys:201 errorMessage = MISSING_BACKGROUND.format(value)202 raise InvalidField(payload={"details": errorMessage})203 # ensure able to find the division204 division_name = background['division']205 if division_name.lower().startswith("ex."):206 error_message = LEFT_BACKGROUND_EXAMPLE.format(division_name)207 raise InvalidField(payload={"details": error_message})208 division = Division.query.filter(func.lower(Division.name) ==209 func.lower(division_name)).first()210 # ensure able to find the league211 league_name = background['league']212 if league_name.lower().startswith("ex."):213 error_message = LEFT_BACKGROUND_EXAMPLE.format(league_name)214 raise InvalidField(payload={"details": error_message})215 league = League.query.filter(func.lower(League.name) ==216 func.lower(league_name)).first()217 if division is None:218 error_message = INVALID_DIVISION.format(division_name)219 raise DivisionDoesNotExist(payload={'details': error_message})220 if league is None:221 error_message = INVALID_LEAGUE.format(league_name)222 raise LeagueDoesNotExist(payload={'details': error_message})223 return {"league": league.json(), "division": division.json()}224def clean_cell(cell):225 """Returns a clean cell"""226 return cell.strip().lower().replace(":", "")227def parse_parts(lines, delimiter=","):228 """Parses the lines and returns a dictionary with the three parts229 Parameters:230 lines: a list of lines231 delimiter: the delimiter for the lines (default = ,)232 Returns:233 a dictionary with background, header, games, warnings where:234 background: dictionary of league235 header: the header row236 games: a list of games lines237 warnings: a list of lines that were not recognized238 """239 background = {}240 header = None241 games = []242 warnings = []243 header_keywords = ([key.lower() for key in HEADERS.keys()] +244 [value.lower() for value in HEADERS.values()])245 background_keywords = ([key.lower() for key in BACKGROUND.keys()] +246 [value.lower() for value in BACKGROUND.values()])247 for line in lines:248 info = line.split(delimiter)249 if clean_cell(info[0]).lower() in background_keywords:250 background[clean_cell(info[0])] = info[1].strip()251 elif info[0].lower().strip() in header_keywords:252 header = info253 elif len(info) >= len(HEADERS.keys()):254 games.append(info)255 else:256 warnings.append(INVALID_ROW.format(line))257 return {'background': background,258 'header': header,259 'games': games,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!