Best Python code snippet using wtframework
foofah_table_graph.py
Source:foofah_table_graph.py
1import re2import string3import math4from operator import itemgetter5from itertools import groupby6import itertools7import operator8import foofah_utils9COST_DELETE_EXISTING_CELL = 110COST_DELETE_CELL = 111COST_DELETE_EMPTY = 112COST_ADD_EMPTY = 113COST_MOVE_EMPTY = 114COST_MOVE_CELL = 115COST_SPLIT = 116COST_MERGE = 117COST_COPY = 118COST_MOVE_CELL_HORIZONTAL_1 = 119cost_data_transform_cpp = False20cost_move_cpp = False21cost_edit_op_cpp = False22debug_print = False23COST_IMPOSSIBLE = 10000024class TableNode:25 def __init__(self, data, row=None, col=None):26 self.data = data27 self.id = id(self)28 self.row = row29 self.col = col30 def __str__(self):31 return "'%s' (%d,%d)" % (self.data, self.row, self.col)32class TableGraph:33 def __init__(self, table):34 self.cells = []35 self.data_set = set()36 for rid, row in enumerate(table):37 for cid, cell in enumerate(row):38 cell_node = TableNode(cell, rid, cid)39 self.cells.append(cell_node)40 self.cell_set = set(self.cells)41 self.cells = tuple(self.cells)42 self.row_num = len(table)43 self.col_num = len(table[0])44 def __str__(self):45 return str(list(self.graph.edges()))46 def nodes(self):47 return self.cells48 def nodes_set(self):49 return self.cell_set50 def graph_edit_distance(self, other):51 return graph_edit_distance(self, other)52 def graph_edit_distance_greedy(self, other, batch=False):53 if batch:54 return clustered_maps(graph_edit_distance_greedy(self, other)[0], self, other)55 return graph_edit_distance_greedy(self, other)56 def batch_graph_edit_distance_greedy(self, other):57 return clustered_maps(graph_edit_distance_greedy(self, other)[0], self, other)58 # Edit distance59 def __sub__(self, other):60 return self.graph_edit_distance(other)61 # Edit distance62 def __rshift__(self, other):63 return self.graph_edit_distance_greedy(other)64# Print a path65def print_map(edge):66 if edge[0] and edge[1]:67 print edge[0].data, "(%d,%d)" % (edge[0].row, edge[0].col), "->", edge[1].data, "(%d,%d)" % (68 edge[1].row, edge[1].col)69 elif edge[0]:70 print edge[0].data, "(%d,%d)" % (edge[0].row, edge[0].col), "->", "empty"71 else:72 print "empty", "->", edge[1].data, "(%d,%d)" % (edge[1].row, edge[1].col)73# Print a path74def print_path(path):75 if path:76 for edge in path:77 if edge[0] and edge[1]:78 print str(edge[0]), "->", str(edge[1]), "%", edge[2]79 elif edge[0]:80 print str(edge[0]), "->", "empty", "%", edge[2]81 else:82 print "empty", "->", str(edge[1]), "%", edge[2]83 print "Actual Cost:", cost_edit_path(path)84 else:85 print "No Transformation Available"86PATTERN_R_2_C = "PATTERN_R_2_C"87PATTERN_R_2_R = "PATTERN_R_2_R"88PATTERN_R_2_T = "PATTERN_R_2_T"89PATTERN_C_2_C = "PATTERN_C_2_C"90PATTERN_C_2_R = "PATTERN_C_2_R"91PATTERN_C_2_T = "PATTERN_C_2_T"92PATTERN_T_2_C = "PATTERN_T_2_C"93PATTERN_T_2_R = "PATTERN_T_2_R"94PATTERN_T_2_T = "PATTERN_T_2_T"95def divide_if_identical_col(path, id=0):96 groups = []97 path.sort(key=lambda x: x[id].col)98 for k, g in groupby(enumerate(path), lambda (i, x): x[id].col):99 groups.append(map(itemgetter(1), g))100 return groups101def divide_if_identical_row(path, id=0):102 groups = []103 path.sort(key=lambda x: x[id].row)104 for k, g in groupby(enumerate(path), lambda (i, x): x[id].row):105 groups.append(map(itemgetter(1), g))106 return groups107def divide_if_discontinuous_col(path, id=0):108 groups = []109 if id == 0:110 path.sort(key=lambda x: x[id].col)111 for k, g in groupby(enumerate(path), lambda (i, x): i - x[id].col):112 groups.append(map(itemgetter(1), g))113 return groups114def divide_if_discontinuous_row(path, c_id=0):115 groups = []116 if c_id == 0:117 path.sort(key=lambda x: x[c_id].row)118 for k, g in groupby(enumerate(path), lambda (i, x): i - x[c_id].row):119 groups.append(map(itemgetter(1), g))120 return groups121def func_1(table_graph):122 if table_graph:123 return table_graph.col124 else:125 return -1126def func_2(table_graph):127 if table_graph:128 return table_graph.row129 else:130 return -1131def cluster_by_columns(path, i=0, continuous=False, identical_row=False):132 cluster_c = {}133 for tran in path:134 if tran[i]:135 if tran[i].col not in cluster_c.keys():136 cluster_c[tran[i].col] = [tran]137 else:138 cluster_c[tran[i].col].append(tran)139 ret_cluster = []140 if continuous:141 for group in cluster_c.values():142 ret_cluster += divide_if_discontinuous_row(group, i)143 return ret_cluster144 elif identical_row:145 for group in cluster_c.values():146 ret_cluster += divide_if_identical_row(group, i)147 return ret_cluster148 else:149 return cluster_c.values()150def cluster_by_rows(path, i=0, continuous=False, identical_row=False):151 cluster_r = {}152 for tran in path:153 if tran[i]:154 if tran[i].row not in cluster_r.keys():155 cluster_r[tran[i].row] = [tran]156 else:157 cluster_r[tran[i].row].append(tran)158 ret_cluster = []159 if continuous:160 for group in cluster_r.values():161 ret_cluster += divide_if_discontinuous_col(group, i)162 return ret_cluster163 elif identical_row:164 for group in cluster_r.values():165 ret_cluster += divide_if_identical_col(group, i)166 return ret_cluster167 else:168 return cluster_r.values()169def cluster_by_types(path):170 path = sorted(path, key=lambda tup: tup[2])171 cluster = []172 for key, group in groupby(path, lambda x: x[2]):173 cluster.append(list(group))174 return cluster175def clustered_maps(path, orig_table, target_table):176 patterns = []177 mv_dict = {}178 for pair in path:179 if pair[0] and pair[1]:180 mv_dict[(pair[0].row, pair[0].col, pair[1].row, pair[1].col)] = pair181 elif pair[0]:182 mv_dict[(pair[0].row, pair[0].col, None, None)] = pair183 elif pair[1]:184 mv_dict[(None, None, pair[1].row, pair[1].col)] = pair185 # Separate by types186 for group in cluster_by_types(path):187 input_output_set = []188 for pair in group:189 if pair[0] and pair[1]:190 input_output_set.append((pair[0].row, pair[0].col, pair[1].row, pair[1].col))191 elif pair[0]:192 input_output_set.append((pair[0].row, pair[0].col, None, None))193 elif pair[1]:194 input_output_set.append((None, None, pair[1].row, pair[1].col))195 if group[0][2] == MAP_TYPE_MV or group[0][2] == MAP_TYPE_MER or group[0][2] == MAP_TYPE_SPL or group[0][196 2] == MAP_TYPE_UNKNOWN:197 # Row major input table198 i_row_o_row = sorted(input_output_set, key=lambda x: (x[0], x[1], x[2], x[3]))199 temp_path = [mv_dict[i_row_o_row[0]]]200 base = i_row_o_row[0]201 i = 1202 while i < len(i_row_o_row):203 # H to H204 if i_row_o_row[i] == (base[0], base[1] + len(temp_path), base[2], base[3] + len(temp_path)):205 temp_path.append(mv_dict[(base[0], base[1] + len(temp_path), base[2], base[3] + len(temp_path))])206 else:207 if len(temp_path) > 1:208 patterns.append(list(temp_path))209 base = i_row_o_row[i]210 temp_path = [mv_dict[i_row_o_row[i]]]211 i += 1212 if len(temp_path) > 1:213 patterns.append(list(temp_path))214 if group[0][2] != MAP_TYPE_MER and group[0][2] != MAP_TYPE_SPL:215 temp_path = [mv_dict[i_row_o_row[0]]]216 base = i_row_o_row[0]217 i = 1218 while i < len(i_row_o_row):219 # One to H220 if i_row_o_row[i] == (base[0], base[1], base[2], base[3] + len(temp_path)):221 temp_path.append(mv_dict[(base[0], base[1], base[2], base[3] + len(temp_path))])222 else:223 if len(temp_path) > 1:224 patterns.append(list(temp_path))225 base = i_row_o_row[i]226 temp_path = [mv_dict[i_row_o_row[i]]]227 i += 1228 if len(temp_path) > 1:229 patterns.append(list(temp_path))230 i_row_o_col = sorted(input_output_set, key=lambda x: (x[0], x[1], x[3], x[2]))231 temp_path = [mv_dict[i_row_o_col[0]]]232 base = i_row_o_col[0]233 i = 1234 while i < len(i_row_o_col):235 # H to V236 if i_row_o_col[i] == (base[0], base[1] + len(temp_path), base[2] + len(temp_path), base[3]):237 temp_path.append(mv_dict[(base[0], base[1] + len(temp_path), base[2] + len(temp_path), base[3])])238 else:239 if len(temp_path) > 1:240 patterns.append(list(temp_path))241 base = i_row_o_col[i]242 temp_path = [mv_dict[i_row_o_col[i]]]243 i += 1244 if len(temp_path) > 1:245 patterns.append(list(temp_path))246 # Sort column major of input table247 i_col_o_col = sorted(input_output_set, key=lambda x: (x[1], x[0], x[3], x[2]))248 temp_path = [mv_dict[i_col_o_col[0]]]249 base = i_col_o_col[0]250 i = 1251 while i < len(i_col_o_col):252 # V to V253 if i_col_o_col[i] == (base[0] + len(temp_path), base[1], base[2] + len(temp_path), base[3]):254 temp_path.append(mv_dict[(base[0] + len(temp_path), base[1], base[2] + len(temp_path), base[3])])255 else:256 if len(temp_path) > 1:257 patterns.append(list(temp_path))258 base = i_col_o_col[i]259 temp_path = [mv_dict[i_col_o_col[i]]]260 i += 1261 if len(temp_path) > 1:262 patterns.append(list(temp_path))263 # Sort column major of output table264 i_col_o_col = sorted(input_output_set, key=lambda x: (x[3], x[2], x[1], x[0]))265 temp_path = [mv_dict[i_col_o_col[0]]]266 base = i_col_o_col[0]267 i = 1268 while i < len(i_col_o_col):269 # V to V270 if i_col_o_col[i] == (base[0] + len(temp_path), base[1], base[2] + len(temp_path), base[3]):271 temp_path.append(mv_dict[(base[0] + len(temp_path), base[1], base[2] + len(temp_path), base[3])])272 else:273 if len(temp_path) > 1:274 patterns.append(list(temp_path))275 base = i_col_o_col[i]276 temp_path = [mv_dict[i_col_o_col[i]]]277 i += 1278 if len(temp_path) > 1:279 patterns.append(list(temp_path))280 if group[0][2] != MAP_TYPE_MER and group[0][2] != MAP_TYPE_SPL:281 temp_path = [mv_dict[i_col_o_col[0]]]282 base = i_col_o_col[0]283 i = 1284 while i < len(i_col_o_col):285 # One to V286 if i_col_o_col[i] == (base[0], base[1], base[2] + len(temp_path), base[3]):287 temp_path.append(mv_dict[(base[0], base[1], base[2] + len(temp_path), base[3])])288 else:289 if len(temp_path) > 1:290 patterns.append(list(temp_path))291 base = i_col_o_col[i]292 temp_path = [mv_dict[i_col_o_col[i]]]293 i += 1294 if len(temp_path) > 1:295 patterns.append(list(temp_path))296 i_col_o_row = sorted(input_output_set, key=lambda x: (x[1], x[0], x[2], x[3]))297 temp_path = [mv_dict[i_col_o_row[0]]]298 base = i_col_o_row[0]299 i = 1300 while i < len(i_col_o_row):301 # V to H302 if i_col_o_row[i] == (base[0] + len(temp_path), base[1], base[2], base[3] + len(temp_path)):303 temp_path.append(mv_dict[(base[0] + len(temp_path), base[1], base[2], base[3] + len(temp_path))])304 else:305 if len(temp_path) > 1:306 patterns.append(list(temp_path))307 base = i_col_o_row[i]308 temp_path = [mv_dict[i_col_o_row[i]]]309 i += 1310 if len(temp_path) > 1:311 patterns.append(list(temp_path))312 i_col_o_row = sorted(input_output_set, key=lambda x: (x[2], x[3], x[1], x[0]))313 temp_path = [mv_dict[i_col_o_row[0]]]314 base = i_col_o_row[0]315 i = 1316 while i < len(i_col_o_row):317 # V to H318 if i_col_o_row[i] == (base[0] + len(temp_path), base[1], base[2], base[3] + len(temp_path)):319 temp_path.append(mv_dict[(base[0] + len(temp_path), base[1], base[2], base[3] + len(temp_path))])320 else:321 if len(temp_path) > 1:322 patterns.append(list(temp_path))323 base = i_col_o_row[i]324 temp_path = [mv_dict[i_col_o_row[i]]]325 i += 1326 if len(temp_path) > 1:327 patterns.append(list(temp_path))328 if group[0][2] == MAP_TYPE_RM:329 temp = sorted(input_output_set, key=operator.itemgetter(1))330 # Group Removes by Column331 for key, g in itertools.groupby(temp, operator.itemgetter(1)):332 temp_path = []333 for t in list(g):334 temp_path.append(mv_dict[t])335 if len(temp_path) > 1:336 patterns.append(list(temp_path))337 # Determine the final groups338 patterns.sort(key=lambda t: len(t), reverse=True)339 final_group = []340 cost = 0341 overlaps = set()342 for group in patterns:343 if not (set(group) & overlaps):344 overlaps = overlaps.union(set(group))345 final_group.append(group)346 cost += sum([mapping[3] for mapping in group]) / float(len(group))347 if debug_print:348 print "*" * 20349 print_path(group)350 print351 if debug_print and set(path) - overlaps:352 print "*" * 20, "Remains"353 print print_path(set(path) - overlaps)354 cost += sum([mapping[3] for mapping in (set(path) - overlaps)])355 return path, cost356def tokenize(a, first=False):357 if not a:358 return [""]359 if first:360 return re.split('[' + string.punctuation + string.whitespace + ']*', a, 1)361 else:362 return re.split('[' + string.punctuation + string.whitespace + ']*', a)363MAP_TYPE_MV = 1364MAP_TYPE_MER = 2365MAP_TYPE_SPL = 3366MAP_TYPE_UNKNOWN = 4367MAP_TYPE_RM = 5368MAP_TYPE_ADD = 6369# Cost of substitution370def cost_data_transform(str1, str2, use_cpp=cost_data_transform_cpp):371 if use_cpp:372 return foofah_utils.cost_data_transform(str1, str2)373 if str1 == str2:374 return 0, MAP_TYPE_MV375 elif not str1 or not str2:376 return COST_IMPOSSIBLE, MAP_TYPE_UNKNOWN377 elif str1 in str2:378 return COST_MERGE, MAP_TYPE_MER379 elif str2 in str1:380 return COST_SPLIT, MAP_TYPE_SPL381 else:382 token_1 = tokenize(str1)383 token_2 = tokenize(str2)384 not_found_1 = False385 if_all_empty = True386 for token in token_1:387 if token:388 if_all_empty = False389 if token not in str2:390 not_found_1 = True391 break392 if if_all_empty:393 not_found_1 = True394 not_found_2 = False395 if_all_empty = True396 for token in token_2:397 if token:398 if_all_empty = False399 if token not in str1:400 not_found_2 = True401 break402 if if_all_empty:403 not_found_2 = True404 if not not_found_1 or not not_found_2:405 return COST_MERGE + COST_SPLIT, MAP_TYPE_UNKNOWN406 return COST_IMPOSSIBLE, MAP_TYPE_UNKNOWN407# Cost of substitution408def cost_move(node_1, node_2, use_cpp=cost_move_cpp):409 if use_cpp:410 return foofah_utils.cost_move(node_1.row, node_1.col, node_2.row, node_2.col, node_1.data)411 cost = 0412 # Moving empty space shouldn't count413 if node_1.data:414 if math.fabs(node_1.col - node_2.col) == 1 and node_1.row == node_2.row:415 cost += COST_MOVE_CELL_HORIZONTAL_1416 elif node_1.row != node_2.row or node_1.col != node_2.col:417 cost += COST_MOVE_CELL418 else:419 if node_1.row != node_2.row or node_1.col != node_2.col:420 cost += COST_MOVE_EMPTY421 return cost422# Calculate the cost of path423def cost_edit_op(operation, target=None, use_cpp=cost_edit_op_cpp):424 cost = 0425 if use_cpp:426 if operation[0] and operation[1]:427 return foofah_utils.cost_edit_op(operation[0].row, operation[0].col, operation[0].data, operation[1].row,428 operation[1].col, operation[1].data)429 elif operation[0]:430 return foofah_utils.cost_edit_op(operation[0].row, operation[0].col, operation[0].data, -1, -1, "")431 elif operation[1]:432 return foofah_utils.cost_edit_op(-1, -1, "", operation[1].row, operation[1].col, operation[1].data)433 else:434 return foofah_utils.cost_edit_op(-1, -1, "", -1, -1, "")435 if operation[0] and operation[1]:436 new_cost, map_type = cost_data_transform(operation[0].data, operation[1].data)437 cost += new_cost438 if cost >= COST_IMPOSSIBLE:439 return cost, map_type440 cost += cost_move(operation[0], operation[1])441 elif operation[0] and operation[0].data:442 cost += COST_DELETE_CELL443 map_type = MAP_TYPE_RM444 elif operation[0] and not operation[0].data:445 cost += COST_DELETE_EMPTY446 map_type = MAP_TYPE_RM447 elif operation[1] and operation[1].data:448 cost += COST_IMPOSSIBLE449 map_type = MAP_TYPE_ADD450 else:451 cost += COST_ADD_EMPTY452 map_type = MAP_TYPE_ADD453 return cost, map_type454# Calculate the cost of path455def cost_edit_path(edit_path, target=None):456 cost = 0457 for operation in edit_path:458 if operation[0] and operation[1]:459 new_cost, sub_type = cost_data_transform(operation[0].data, operation[1].data)460 cost += new_cost461 if cost >= COST_IMPOSSIBLE:462 return cost463 cost += cost_move(operation[0], operation[1])464 elif operation[0] and operation[0].data:465 cost += COST_DELETE_CELL466 elif operation[0] and not operation[0].data:467 cost += COST_DELETE_EMPTY468 elif operation[1] and operation[1].data:469 cost += COST_IMPOSSIBLE470 else:471 cost += COST_ADD_EMPTY472 return cost473# Check unprocessed nodes in graph u and v474def check_unprocessed(u, v, path):475 processed_u = []476 processed_v = []477 for operation in path:478 if operation[0]:479 processed_u.append(operation[0])480 if operation[1]:481 processed_v.append(operation[1])482 unprocessed_u = u.nodes_set() - set(processed_u)483 unprocessed_v = v.nodes_set() - set(processed_v)484 return list(unprocessed_u), list(unprocessed_v)485# More greedy edit distance graph486def graph_edit_distance_greedy(u, v):487 chosen_path = []488 chosen_path_cost = 0489 # For each node w in u, insert the substitution {w -> v1} into OPEN490 v1 = v.nodes()[0]491 possible_path = []492 possible_path_cost = []493 for w in u.nodes():494 edit_op = (w, v1)495 new_cost, map_type = cost_edit_op(edit_op, v)496 if map_type == MAP_TYPE_MV:497 if_exact_match_found = True498 new_path = (w, v1, map_type, new_cost)499 possible_path.append(new_path)500 possible_path_cost.append(new_cost)501 # Comes out of nowhere502 edit_op = (None, v1)503 new_cost, map_type = cost_edit_op(edit_op, v)504 edit_path = (None, v1, map_type, new_cost)505 possible_path.append(edit_path)506 possible_path_cost.append(new_cost)507 path_idx = possible_path_cost.index(min(possible_path_cost))508 # The cheapest operation is not a move when exact match exists, we keep finding the second cheapest until we find509 # the move510 chosen_path.append(possible_path[path_idx])511 chosen_path_cost += possible_path_cost[path_idx]512 unprocessed_u = list(u.nodes())513 unprocessed_v = list(v.nodes())514 if possible_path[path_idx][0] in unprocessed_u:515 unprocessed_u.remove(possible_path[path_idx][0])516 unprocessed_v.pop(0)517 while unprocessed_v and unprocessed_u:518 v_next = unprocessed_v.pop(0)519 possible_path = []520 possible_path_cost = []521 if_exact_match_found = False522 for u_next in unprocessed_u:523 edit_op = (u_next, v_next)524 new_cost, map_type = cost_edit_op(edit_op, v)525 if map_type == MAP_TYPE_MV:526 if_exact_match_found = True527 new_path = (u_next, v_next, map_type, new_cost)528 possible_path.append(new_path)529 possible_path_cost.append(new_cost)530 if new_cost <= 0:531 break532 edit_op = (None, v_next)533 new_cost, map_type = cost_edit_op(edit_op, v)534 new_path = (None, v_next, map_type, new_cost)535 possible_path.append(new_path)536 possible_path_cost.append(new_cost)537 path_idx = possible_path_cost.index(min(possible_path_cost))538 # The cheapest operation is not a move when exact match exists, we keep finding the second cheapest until we539 # find the move540 while if_exact_match_found and possible_path[path_idx][2] != MAP_TYPE_MV:541 if len(possible_path_cost) > 1:542 possible_path_cost.pop(path_idx)543 possible_path.pop(path_idx)544 path_idx = possible_path_cost.index(min(possible_path_cost))545 else:546 break547 # We already don't have a good choice in unprocessed v, let's pick one from the old choice548 if possible_path[path_idx][2] == MAP_TYPE_UNKNOWN or possible_path[path_idx][2] == MAP_TYPE_SPL or \549 possible_path[path_idx][2] == MAP_TYPE_MER:550 possible_path_new = []551 possible_path_cost_new = []552 for u_next in u.nodes():553 edit_op = (u_next, v_next)554 new_cost, map_type = cost_edit_op(edit_op, v)555 new_path = (u_next, v_next, map_type, new_cost)556 possible_path_new.append(new_path)557 possible_path_cost_new.append(new_cost)558 if new_cost <= 0:559 break560 path_idx_new = possible_path_cost_new.index(min(possible_path_cost_new))561 if possible_path_cost_new[path_idx_new] < possible_path_cost[path_idx]:562 chosen_path.append(possible_path_new[path_idx_new])563 chosen_path_cost += possible_path_cost_new[path_idx_new]564 if possible_path_new[path_idx_new][0] in unprocessed_u:565 unprocessed_u.remove(possible_path_new[path_idx_new][0])566 else:567 chosen_path.append(possible_path[path_idx])568 chosen_path_cost += possible_path_cost[path_idx]569 if possible_path[path_idx][0] in unprocessed_u:570 unprocessed_u.remove(possible_path[path_idx][0])571 else:572 chosen_path.append(possible_path[path_idx])573 chosen_path_cost += possible_path_cost[path_idx]574 if possible_path[path_idx][0] in unprocessed_u:575 unprocessed_u.remove(possible_path[path_idx][0])576 # If unprocessed_u is empty, but unprocessed_v is not, we transform some of the old u nodes577 if not unprocessed_u and unprocessed_v:578 for v_next in unprocessed_v:579 possible_path = []580 possible_path_cost = []581 for u_old in u.nodes():582 edit_op = (u_old, v_next)583 new_cost, map_type = cost_edit_op(edit_op, v)584 new_path = (u_old, v_next, map_type, new_cost)585 possible_path.append(new_path)586 possible_path_cost.append(new_cost)587 edit_op = (None, v_next)588 new_cost, map_type = cost_edit_op(edit_op, v)589 new_path = (None, v_next, map_type, new_cost)590 possible_path.append(new_path)591 possible_path_cost.append(new_cost)592 path_idx = possible_path_cost.index(min(possible_path_cost))593 chosen_path.append(possible_path[path_idx])594 chosen_path_cost += possible_path_cost[path_idx]595 # If unprocessed_v is empty, but unprocessed_u is not, we kick the rest of unprocessed u out596 if unprocessed_u and not unprocessed_v:597 for u_next in unprocessed_u:598 edit_op = (u_next, None)599 new_cost, map_type = cost_edit_op(edit_op, v)600 new_path = (u_next, None, map_type, new_cost)601 chosen_path.append(new_path)602 chosen_path_cost += new_cost603 if debug_print:604 print_path(chosen_path)605 return chosen_path, chosen_path_cost606def graph_edit_distance(u, v):607 # Partial edit path608 open_set = []609 cost_open_set = []610 # For each node w in V2, insert the substitution {u1 -> w} into OPEN611 u1 = u.nodes()[0]612 for w in v.nodes():613 edit_path = set()614 edit_path.add((u1, w))615 new_cost = cost_edit_path(edit_path)616 if new_cost < COST_IMPOSSIBLE:617 open_set.append(edit_path)618 cost_open_set.append(new_cost)619 # Insert the deletion {u1 -> none} into OPEN620 edit_path = set()621 edit_path.add((u1, None))622 new_cost = cost_edit_path(edit_path)623 if new_cost < COST_IMPOSSIBLE:624 open_set.append(edit_path)625 cost_open_set.append(new_cost)626 while cost_open_set:627 # Retrieve minimum-cost partial edit path pmin from OPEN628 path_idx = cost_open_set.index(min(cost_open_set))629 min_path = open_set.pop(path_idx)630 cost = cost_open_set.pop(path_idx)631 # check p_min is a complete edit path632 unprocessed_u, unprocessed_v = check_unprocessed(u, v, min_path)633 if not unprocessed_u and not unprocessed_v:634 # print len(cost_open_set)635 return min_path, cost636 else:637 if unprocessed_u:638 u_next = unprocessed_u.pop()639 for v_next in unprocessed_v:640 new_path = set(min_path)641 new_path.add((u_next, v_next))642 new_cost = cost_edit_path(new_path)643 if new_cost < COST_IMPOSSIBLE:644 open_set.append(new_path)645 cost_open_set.append(new_cost)646 new_path = set(min_path)647 new_path.add((u_next, None))648 new_cost = cost_edit_path(new_path)649 if new_cost < COST_IMPOSSIBLE:650 open_set.append(new_path)651 cost_open_set.append(new_cost)652 else:653 # All nodes in u have been processed, but there are nodes in v not been processed654 # They are either copied, splited or merged from u655 for v_next in unprocessed_v:656 for u_old in u.nodes():657 new_path = set(min_path)658 new_path.add((u_old, v_next))659 new_cost = cost_edit_path(new_path)660 if new_cost < COST_IMPOSSIBLE:661 open_set.append(new_path)662 cost_open_set.append(new_cost)...
Heap.py
Source:Heap.py
...24 self.heapify(self.root)25 self.set_generation_links(self.root)26 print('---------- After Heapify ----------')27 self.print_tree_levels(self.root, 0)28 def get_temp_path(self):29 """30 Returns the temporary path.31 :return: The temporary path class variable.32 """33 return self.temp_path34 def read_paths(self, input_file):35 """ 36 Reads inputFile given at the command line and places the contents of 37 each line into the path field found in each PathNode object. The order 38 is the same as found in the text file. Adds the PathNode object to the 39 temp_path starting at temp_path[1].40 41 :param input_file: The file to read the data from.42 """...
test_vcf_readcount_annotator.py
Source:test_vcf_readcount_annotator.py
1import unittest2import sys3import os4import py_compile5from vatools import vcf_readcount_annotator6import tempfile7from filecmp import cmp8import io9import logging10from testfixtures import LogCapture, StringComparison as S11class VcfExpressionEncoderTests(unittest.TestCase):12 @classmethod13 def setUpClass(cls):14 base_dir = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), '..'))15 cls.executable = os.path.join(base_dir, 'vatools', 'vcf_readcount_annotator.py')16 cls.test_data_dir = os.path.join(base_dir, 'tests', 'test_data')17 def test_source_compiles(self):18 self.assertTrue(py_compile.compile(self.executable))19 def test_error_more_than_one_sample_without_sample_name(self):20 with self.assertRaises(Exception) as context:21 command = [22 os.path.join(self.test_data_dir, 'multiple_samples.vcf'),23 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),24 'DNA',25 ]26 vcf_readcount_annotator.main(command)27 self.assertTrue('contains more than one sample. Please use the -s option to specify which sample to annotate.' in str(context.exception))28 def test_error_more_than_one_sample_with_wrong_sample_name(self):29 with self.assertRaises(Exception) as context:30 command = [31 os.path.join(self.test_data_dir, 'multiple_samples.vcf'),32 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),33 'DNA',34 '-s', 'nonexistent_sample',35 ]36 vcf_readcount_annotator.main(command)37 self.assertTrue('does not contain a sample column for sample nonexistent_sample.' in str(context.exception))38 def test_single_sample_vcf_without_readcounts_annotations_dna_mode(self):39 temp_path = tempfile.TemporaryDirectory()40 os.symlink(os.path.join(self.test_data_dir, 'input.vcf'), os.path.join(temp_path.name, 'input.vcf'))41 command = [42 os.path.join(temp_path.name, 'input.vcf'),43 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),44 'DNA',45 ]46 vcf_readcount_annotator.main(command)47 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'single_sample.dna.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))48 temp_path.cleanup()49 def test_single_sample_vcf_without_readcounts_annotations_rna_mode(self):50 temp_path = tempfile.TemporaryDirectory()51 os.symlink(os.path.join(self.test_data_dir, 'input.vcf'), os.path.join(temp_path.name, 'input.vcf'))52 command = [53 os.path.join(temp_path.name, 'input.vcf'),54 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),55 'RNA',56 ]57 vcf_readcount_annotator.main(command)58 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'single_sample.rna.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))59 temp_path.cleanup()60 def test_single_sample_vcf_with_existing_readcount_annotations(self):61 temp_path = tempfile.TemporaryDirectory()62 os.symlink(os.path.join(self.test_data_dir, 'input.readcount.vcf'), os.path.join(temp_path.name, 'input.vcf'))63 command = [64 os.path.join(temp_path.name, 'input.vcf'),65 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),66 'DNA',67 ]68 vcf_readcount_annotator.main(command)69 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'single_sample_with_existing_readcount_annotations.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))70 temp_path.cleanup()71 def test_mutation_without_matching_readcount_value(self):72 temp_path = tempfile.TemporaryDirectory()73 os.symlink(os.path.join(self.test_data_dir, 'no_matching_readcount.vcf'), os.path.join(temp_path.name, 'input.vcf'))74 command = [75 os.path.join(temp_path.name, 'input.vcf'),76 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),77 'DNA',78 ]79 vcf_readcount_annotator.main(command)80 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'no_matching_readcount.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))81 temp_path.cleanup()82 def test_multi_sample_vcf(self):83 temp_path = tempfile.TemporaryDirectory()84 os.symlink(os.path.join(self.test_data_dir, 'multiple_samples.vcf'), os.path.join(temp_path.name, 'input.vcf'))85 command = [86 os.path.join(temp_path.name, 'input.vcf'),87 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),88 'DNA',89 '-s', 'H_NJ-HCC1395-HCC1395',90 ]91 vcf_readcount_annotator.main(command)92 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'multiple_samples.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))93 temp_path.cleanup()94 def test_multiple_alts(self):95 temp_path = tempfile.TemporaryDirectory()96 os.symlink(os.path.join(self.test_data_dir, 'multiple_samples.readcount.vcf'), os.path.join(temp_path.name, 'input.vcf'))97 command = [98 os.path.join(temp_path.name, 'input.vcf'),99 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),100 'DNA',101 '-s', 'H_NJ-HCC1395-HCC1396',102 ]103 vcf_readcount_annotator.main(command)104 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'multiple_samples_second_alt.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))105 temp_path.cleanup()106 def test_input_AF_is_of_number_1(self):107 temp_path = tempfile.TemporaryDirectory()108 os.symlink(os.path.join(self.test_data_dir, 'af_number_1.vcf'), os.path.join(temp_path.name, 'input.vcf'))109 command = [110 os.path.join(temp_path.name, 'input.vcf'),111 os.path.join(self.test_data_dir, 'af_number_1.bam-readcount.tsv'),112 'DNA',113 '-s', 'TUMOR'114 ]115 vcf_readcount_annotator.main(command)116 def test_hom_ref_genotype(self):117 temp_path = tempfile.TemporaryDirectory()118 os.symlink(os.path.join(self.test_data_dir, 'hom_ref.vcf'), os.path.join(temp_path.name, 'input.vcf'))119 command = [120 os.path.join(temp_path.name, 'input.vcf'),121 os.path.join(self.test_data_dir, 'hom_ref.bam_readcount'),122 'DNA',123 '-s', 'NORMAL'124 ]125 vcf_readcount_annotator.main(command)126 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'hom_ref.readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))127 temp_path.cleanup()128 def test_duplicate_bam_readcount_entries_discrepant_depth(self):129 temp_path = tempfile.TemporaryDirectory()130 os.symlink(os.path.join(self.test_data_dir, 'duplicate_entries.vcf'), os.path.join(temp_path.name, 'input.vcf'))131 logging.disable(logging.NOTSET)132 with LogCapture() as l:133 command = [134 os.path.join(temp_path.name, 'input.vcf'),135 os.path.join(self.test_data_dir, 'duplicate_entries_discrepant_depths.bam_readcount'),136 'DNA'137 ]138 vcf_readcount_annotator.main(command)139 warn_message = "Depths are discrepant, so neither entry will be included in the output vcf."140 logged_str = "".join(l.actual()[0])141 #the warning is broken into several lines when written to the log; manually extract the log, which is returned as 142 #a list of tuples. grab the relevant (and in this case only) tuple, the first, then combine into one string for comparison143 self.assertTrue(warn_message in logged_str)144 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'duplicate_entries_discrepant_depths.bam_readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))145 temp_path.cleanup()146 def test_duplicate_bam_readcount_entries_same_depth(self):147 temp_path = tempfile.TemporaryDirectory()148 os.symlink(os.path.join(self.test_data_dir, 'duplicate_entries.vcf'), os.path.join(temp_path.name, 'input.vcf'))149 logging.disable(logging.NOTSET)150 with LogCapture() as l:151 command = [152 os.path.join(temp_path.name, 'input.vcf'),153 os.path.join(self.test_data_dir, 'duplicate_entries_same_depths.bam_readcount'),154 'DNA', '-s', 'H_NJ-HCC1395-HCC1395'155 ]156 vcf_readcount_annotator.main(command)157 warn_message = "Both depths match, so this field will be written, but count and frequency fields will be skipped."158 logged_str = "".join(l.actual()[0])159 self.assertTrue(warn_message in logged_str)160 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'duplicate_entries_same_depths.bam_readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))161 temp_path.cleanup()162 def test_snv_mode(self):163 temp_path = tempfile.TemporaryDirectory()164 os.symlink(os.path.join(self.test_data_dir, 'input.snvs_and_indels.vcf'), os.path.join(temp_path.name, 'input.vcf'))165 command = [166 os.path.join(temp_path.name, 'input.vcf'),167 os.path.join(self.test_data_dir, 'snvs.bam_readcount'),168 'DNA',169 '--variant-type', 'snv',170 ]171 vcf_readcount_annotator.main(command)172 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'snv_mode.bam_readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))173 temp_path.cleanup()174 def test_indel_mode(self):175 temp_path = tempfile.TemporaryDirectory()176 os.symlink(os.path.join(self.test_data_dir, 'input.snvs_and_indels.vcf'), os.path.join(temp_path.name, 'input.vcf'))177 command = [178 os.path.join(temp_path.name, 'input.vcf'),179 os.path.join(self.test_data_dir, 'indels.bam_readcount'),180 'DNA',181 '--variant-type', 'indel',182 ]183 vcf_readcount_annotator.main(command)184 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'indel_mode.bam_readcount.vcf'), os.path.join(temp_path.name, 'input.readcount.vcf')))185 temp_path.cleanup()186 def test_complex_indel(self):187 temp_path = tempfile.TemporaryDirectory()188 os.symlink(os.path.join(self.test_data_dir, 'input.complex_indel.vcf.gz'), os.path.join(temp_path.name, 'input.vcf.gz'))189 command = [190 os.path.join(temp_path.name, 'input.vcf.gz'),191 os.path.join(self.test_data_dir, 'complex_indel.bam_readcount'),192 'DNA',193 '-s', 'TUMOR',194 ]195 vcf_readcount_annotator.main(command)196 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'complex_indel.readcount.vcf.gz'), os.path.join(temp_path.name, 'input.readcount.vcf.gz')))197 temp_path.cleanup()198 def test_mnp(self):199 temp_path = tempfile.TemporaryDirectory()200 os.symlink(os.path.join(self.test_data_dir, 'input.mnp.vcf.gz'), os.path.join(temp_path.name, 'input.vcf.gz'))201 command = [202 os.path.join(temp_path.name, 'input.vcf.gz'),203 os.path.join(self.test_data_dir, 'complex_indel.bam_readcount'),204 'DNA',205 '-s', 'TUMOR',206 ]207 vcf_readcount_annotator.main(command)208 self.assertTrue(cmp(os.path.join(self.test_data_dir, 'mnp.readcount.vcf.gz'), os.path.join(temp_path.name, 'input.readcount.vcf.gz')))...
generate_docs.py
Source:generate_docs.py
1import os2import os.path3import re4import subprocess5import shutil6from datetime import date7import sys8def run(*args, **kwargs):9 error_code = subprocess.call(*args, **kwargs)10 if error_code != 0: # an error happened11 sys.exit(error_code)12temp_path = "./temp"13if not os.path.exists(temp_path):14 os.mkdir(temp_path)15shutil.copyfile("Doxyfile", os.path.join(temp_path, "Doxyfile"))16#shutil.copyfile("../README.md", os.path.join(temp_path, "README.md"))17#shutil.copyfile("footer.html", os.path.join(temp_path, "footer.html"))18shutil.copyfile("header.html", os.path.join(temp_path, "header.html"))19shutil.copyfile("favicon.ico", os.path.join(temp_path, "favicon.ico"))20files = []21for root, dirnames, filenames in os.walk("../games/"):22 for filename in filenames:23 path = os.path.join(root, filename)24 if 'impl' in path or filename == 'CMakeLists.txt':25 continue26 files.append(path)27find_game_name = re.compile('games\\/(.*)\\/')28game_names = set()29for path in files:30 #shutil.copyfile(os.path.join(game_path, name), os.path.join(temp_path, name))31 temp_path_for_file = os.path.normpath(os.path.join(temp_path, path.replace("../", '')))32 dir_path = os.path.dirname(temp_path_for_file)33 if not os.path.exists(dir_path):34 os.makedirs(dir_path)35 searched = find_game_name.search(path)36 lower_game_name = searched and searched.group(1)37 game_name = lower_game_name[0].upper() + lower_game_name[1:]38 game_names.add(game_name)39 is_game = path.endswith("game.hpp")40 did_remarks = False41 with open(path, "r") as f:42 lines = f.readlines()43 # for each line check if it is a class declaration, if so remove trailing `_`s44 with open(temp_path_for_file, "w") as temp_file:45 for i, line in enumerate(lines):46 if is_game and line.startswith("/// </summary>") and not did_remarks:47 did_remarks = True48 line = line + """/// <remarks>49/// The full game rules for {game_name} can be found on <a href="https://github.com/siggame/Cadre/blob/master/Games/{game_name}/rules.md">GitHub</a>.50///51/// Additional materials, such as the <a href="https://github.com/siggame/Cadre/blob/master/Games/{game_name}/story.md">story</a> and <a href="https://github.com/siggame/Cadre/blob/master/Games/{game_name}/creer.yaml">game template</a> can be found on <a href="https://github.com/siggame/Cadre/blob/master/Games/{game_name}/">GitHub</a> as well.52/// </remarks>53""".format(game_name=game_name)54 if line.startswith("class"): # remove trailing underscores55 line = line.rstrip()56 split = line.split(" ")57 for j, word in enumerate(split):58 if word[-1] == "_":59 split[j] = word[0:len(word)-1] # cutoff the trailing `_`60 line = " ".join(split) + "\n"61 temp_file.write(line)62for extra in ["base_object.cpp", "base_object.hpp",63 "base_ai.cpp", "base_ai.hpp"]:64 shutil.copyfile(os.path.join("..", "joueur/src/", extra), os.path.join(temp_path, extra))65with open("../README.md", 'r') as f:66 readme = f.read()67readme_i = readme.find("## How to Run")68if readme_i > -1:69 readme = readme[:readme_i] + """70## Games71{}72""".format('\n'.join(["- [{}](namespacecpp__client_1_1{}.html)".format(g, g.lower()) for g in sorted(game_names)])) + readme[readme_i:]73with open(os.path.join(temp_path, "README.md"), 'w+') as f:74 f.write(readme)75run(["doxygen"], shell=True, cwd=temp_path)76output_path = "./output"77if os.path.isdir(output_path):78 shutil.rmtree(output_path)79shutil.copytree(os.path.join(temp_path, "docs", "html"), output_path)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!