Best Python code snippet using playwright-python
extra_function.py
Source:extra_function.py
1from user_class import *2import copy3import random4from memory_profiler import profile5import json6import argparse7def parse_arguments():8 9 parser = argparse.ArgumentParser()10 subparser = parser.add_mutually_exclusive_group(required=True)11 subparser.add_argument('-net', metavar='net.json', help='net file')12 subparser.add_argument('-b', metavar='behavior_space.json', help='behavior space')13 subparser.add_argument('-o', metavar='oss_space.json', help='behavior space from osservation')14 subparser.add_argument('-cs', metavar='closing_space.json', help='silence closing space')15 subparser.add_argument('-dgn', metavar='diagnosticator.json', help='diagnosticator')16 17 subparser2 = parser.add_mutually_exclusive_group(required=False)18 subparser2.add_argument("-all", help='run all program', action='store_const', const='all', dest='step')19 subparser2.add_argument('-gb', help='generate behavior space', action='store_const', const='gb', dest='step')20 21 subparser2.add_argument("-go", help='generate behavior space from osservation', action='store_const', const='gbo', dest='step')22 23 subparser2.add_argument("-d", help='diagnosis behavior space', action='store_const', const='d', dest='step')24 subparser2.add_argument("-gcs", help='generate closing space', action='store_const', const='gcs', dest='step')25 26 #subparser2.add_argument("-dc", help='diagnosis closing space', action='store_const', const='dc', dest='step')27 subparser2.add_argument("-gd", help='generate diagnosticator', action='store_const', const='gd', dest='step')28 subparser2.add_argument("-do", help='linear_diagnostic', action='store_const', const='do', dest='step')29 30 parser.add_argument("-ol2", help='osservation list for diagnosticator', required=False, nargs='*')31 parser.add_argument("-ol", help='osservation list', required=False, nargs='*')32 33 parser.add_argument("-view", help='view file', required=False)34 parser.add_argument("-bk", help='view file', action='store_true', required=False) 35 36 return parser.parse_args()37def json_to_obj(path):38 39 with open(path) as f:40 data = json.load(f)41 list_FA = []42 list_link = []43 for link in data["list_link"]:44 LINK = Link(link["start_FA"], link["finish_FA"], link["alias"])45 list_link.append(LINK)46 for fa in data["list_fa"]:47 48 list_states = []49 for state in fa["list_states"]:50 state_alias = state["state_alias"]51 state_obj = State(alias=state_alias)52 list_states.append(state_obj)53 54 list_trans = []55 for trans in fa["list_trans"]:56 alias = trans["alias"]57 state_in_alias = trans["state_in"]58 state_out_alias = trans["state_out"]59 label_oss = trans["label_oss"] if trans["label_oss"] else '\u03b5'60 label_rel = trans["label_rel"] if trans["label_rel"] else '\u03b5'61 input_event = trans["input_event"]62 output_event = trans["output_event"]63 state_in = None64 state_out = None65 for state in list_states:66 if state_in and state_out:67 break68 if state.alias == state_in_alias:69 state_in = state70 if state.alias == state_out_alias:71 state_out = state72 73 input_event_func = None74 if input_event:75 event = Event(input_event["event"])76 link_alias = input_event["link"]77 link = None78 for link_temp in list_link:79 if link_temp.alias == link_alias:80 link = link_temp81 break82 83 input_event_func = EventFunction(event,link)84 85 output_events_func = []86 for out_ev in output_event:87 event = Event(out_ev["event"])88 link_alias = out_ev["link"]89 link = None90 for link_temp in list_link:91 if link_temp.alias == link_alias:92 link = link_temp93 break94 95 out_temp = EventFunction(event,link)96 output_events_func.append(out_temp)97 trans_obj = Transition(state_in, state_out, input_event_func = input_event_func, output_events_func = output_events_func, label_oss = label_oss, label_rel = label_rel, alias = alias)98 list_trans.append(trans_obj)99 100 101 initial_state_alias = fa["initial_state"]102 actual_state_alias = fa["actual_state"]103 initial_state = None104 actual_state = None105 for state in list_states:106 if initial_state and actual_state:107 break108 if state.alias == initial_state_alias:109 initial_state = state110 if state.alias == actual_state_alias:111 actual_state = state112 alias = fa["FA_alias"]113 FA_temp = FA(list_states, list_trans, initial_state, actual_state, alias = alias)114 list_FA.append(FA_temp)115 net = Net(list_FA, list_link)116 '''for f in net.list_FA:117 print(f.alias)118 for state in f.list_states:119 print(state.alias)120 for trans in f.list_trans:121 print(trans.alias, trans.start_state.alias, trans.finish_state.alias, trans.label_oss)122 print("input", trans.input_event_func)'''123 124 '''with open("net.json", "w") as f:125 f.write(net)'''126 127 return net128def format_net_to_text(doc, text_to_file):129 text_to_file += "NET with " + str(len(doc.list_FA)) + " FAs and " + str(len(doc.list_link)) + " LINKS\n"130 text_to_file += ""131 for f in doc.list_FA:132 text_to_file += "\tFA: " + f.alias + " with " + str(len(f.list_states)) + " states and " + str(len(f.list_trans))+" trans\n"133 134 text_to_file += "\t\t STATES: "135 136 for state in f.list_states:137 text_to_file += state.alias + " "138 text_to_file += "\n"139 text_to_file += "\t\t TRANSITION: "140 for trans in f.list_trans:141 text_to_file += trans.alias + " "142 text_to_file += "\n"143 return text_to_file144def BFS(list_routes, list_space_nodes, final_node):145 grey = []146 good_nodes = set([])147 grey.append(final_node)148 q = []149 q.append(final_node)150 while q:151 node = q.pop(0)152 for route in list_routes:153 if route.finish_node == node:154 if route.start_node not in grey and route.start_node not in good_nodes:155 q.append(route.start_node)156 grey.append(route.start_node)157 good_nodes.add(node)158 grey.remove(node)159 return list_space_nodes - good_nodes160def change_state(transition, index_FA, state_alias, list_link, node, osservation = None):161 new_list_values_link = node.list_values_link.copy()162 new_list_states_FA = node.list_states_FA.copy()163 index_oss = node.index_oss164 if osservation != None:165 if transition.label_oss in osservation and index_oss < len(osservation):166 if transition.label_oss != osservation[index_oss]:167 return None168 else:169 index_oss += 1170 else:171 if transition.label_oss != '\u03b5':172 return None173 func = transition.input_event_func174 if func is not None:175 func_link = func.link176 func_event = func.event177 index_link = list_link.index(func_link)178 if new_list_values_link[index_link] != func_event.alias:179 return None180 else:181 new_list_values_link[index_link] = ''182 funcs = transition.output_events_func183 for func in funcs:184 func_link = func.link185 func_event = func.event186 index_link = list_link.index(func_link)187 if new_list_values_link[index_link] != '':188 return None189 else:190 new_list_values_link[index_link] = func_event.alias191 new_state_alias = transition.finish_state.alias192 new_list_states_FA[index_FA] = new_state_alias193 return Node(list_states_FA = new_list_states_FA, list_values_link = new_list_values_link, index_oss = index_oss)194from operator import attrgetter195from itertools import groupby196def reg_expr(oss_space):197 list_routes = oss_space.list_routes198 list_nodes = oss_space.list_nodes199 initial_node = oss_space.initial_node200 if len(list_nodes) == 0:201 print("No nodes")202 return ''203 list_nodes.remove(initial_node)204 if any(route for route in list_routes if route.finish_node == initial_node):205 place_holder_node = Node([],[])206 new_route = Route(place_holder_node, initial_node)207 list_routes.append(new_route)208 initial_node = place_holder_node209 210 final_node = Node([],[])211 for node in list_nodes:212 if node.final:213 new_route = Route(node, final_node)214 list_routes.append(new_route)215 while len(list_nodes) > 0:216 list_route_node_auto = []217 list_route_node_input = []218 list_route_node_output = []219 node = random.choice(list_nodes)220 #print(node.id)221 222 for route in list_routes:223 if route.start_node == node and route.finish_node == node:224 list_route_node_auto.append(route)225 if route.start_node == node and route.finish_node != node:226 list_route_node_output.append(route) 227 if route.finish_node == node and route.start_node != node:228 list_route_node_input.append(route)229 230 if len(list_route_node_output) > 1:231 232 groups = groupby(list_route_node_output, attrgetter('finish_node')) #o(n)233 for (key, data) in groups:234 routes = list(data)235 if len(routes) > 1: 236 new_labels_rel = set()237 for route in routes:238 if len(route.label_rel) > 2:239 if route.label_rel[0] == '(' and route.label_rel[-1] == ')' and route.label_rel[-2]!='*':240 route.label_rel = route.label_rel[1:-1]241 #print(route.label_rel)242 for label_t in route.set_label_rel:243 new_labels_rel.add(label_t)244 list_routes.remove(route)245 list_route_node_output.remove(route)246 247 new_label_rel = '|'.join(new_labels_rel)248 #print(new_label_rel)249 new_route = Route(node, key, label_rel = new_label_rel)250 new_route.set_label_rel = new_labels_rel251 list_routes.append(new_route)252 list_route_node_output.append(new_route)253 if len(list_route_node_input) == 1 and len(list_route_node_output) == 1 and len(list_route_node_auto) == 0:254 route_in = list_route_node_input[0]255 route_out = list_route_node_output[0]256 new_label, set_label = clear_label(route_in.label_rel, route_out.label_rel, set_label1=route_in.set_label_rel, set_label2=route_out.set_label_rel)257 258 new_route = Route(route_in.start_node, route_out.finish_node , label_rel = new_label)259 new_route.set_label_rel = set_label260 261 list_routes.append(new_route)262 263 list_routes.remove(route_in)264 list_routes.remove(route_out)265 list_route_node_input.remove(route_in)266 list_route_node_output.remove(route_out)267 268 269 if len(list_route_node_auto) > 0:270 for auto_route in list_route_node_auto:271 272 for input_route in list_route_node_input:273 for output_route in list_route_node_output:274 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, autolabel=auto_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel, set_auto=auto_route.set_label_rel)275 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label) 276 #print("entrato", auto_route.label_rel)277 new_route.set_label_rel = set_label278 #print("uscito", new_route.label_rel)279 list_routes.append(new_route)280 list_routes.remove(input_route) 281 list_routes.remove(auto_route)282 for output_route in list_route_node_output:283 list_routes.remove(output_route)284 else:285 for input_route in list_route_node_input:286 for output_route in list_route_node_output:287 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel)288 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label)289 new_route.set_label_rel = set_label290 list_routes.append(new_route)291 list_routes.remove(input_route)292 for output_route in list_route_node_output:293 list_routes.remove(output_route)294 list_nodes.remove(node)295 '''for route in list_routes:296 print("ROUTE",route.start_node.id, route.finish_node.id)297 '''298 final_labels = set()299 for route in list_routes:300 #new_labels_rel = set()301 if len(route.label_rel) > 2:302 if route.label_rel[0] == '(' and route.label_rel[-1] == ')' and route.label_rel[-2]!='*':303 route.label_rel = route.label_rel[1:-1]304 305 if len(route.set_label_rel) > 1:306 list_label = route.label_rel.split("|")307 for label in list_label:308 if label[0] == '(' and label[-1] == ')' and label[-2]!='*':309 label = label[1:-1]310 final_labels.add(label)311 '''for label in route.set_label_rel:312 final_labels.add(label)'''313 else:314 final_labels.add(route.label_rel)315 final_label = '|'.join(final_labels)316 #new_route = Route(node, key, label_rel = final_label)317 318 return final_label #return regular expression319def reg_expr_closing(closing_space):320 321 list_routes = closing_space.list_routes322 list_nodes = [node.node for node in closing_space.list_nodes]323 initial_node = closing_space.initial_node324 list_output_routes = closing_space.list_output_routes325 list_final_output_nodes = []326 list_output_nodes = []327 #list_final_routes = []328 list_nodes_not_final = []329 ex_initial_node_id = -2330 if len(list_nodes) == 1:331 new_route = Route(initial_node.node, initial_node.node)332 new_route.label_rel = '\u03b5'333 new_route.rif_node = initial_node.node 334 return [new_route] 335 336 if any(route for route in list_routes if route.finish_node.id == initial_node.node.id):337 338 place_holder_node = Node([],[])339 new_route = Route(place_holder_node, initial_node.node)340 list_routes.append(new_route)341 list_nodes_not_final.append(place_holder_node)342 ex_initial_node_id = initial_node.node.id343 initial_node.node = place_holder_node344 place_holder_node.id = -1345 346 final_node = Node([],[])347 final_node.id = -99348 349 for node in list_nodes:350 if node.final:351 list_final_output_nodes.append(node)352 new_route = Route(node, final_node)353 list_routes.append(new_route)354 else:355 list_nodes_not_final.append(node)356 for route in list_output_routes:357 if not route.start_node.final:358 if not route.start_node in list_output_nodes:359 list_output_nodes.append(route.start_node)360 list_final_output_nodes.append(route.start_node)361 new_route = Route(route.start_node, final_node)362 list_routes.append(new_route)363 list_nodes_not_final.remove(route.start_node)364 number_expr_nodes = len(list_final_output_nodes)365 366 if initial_node.node in list_nodes_not_final:367 list_nodes_not_final.remove(initial_node.node)368 while len(list_nodes_not_final) > 0:369 list_route_node_auto = []370 list_route_node_input = []371 list_route_node_output = []372 node = random.choice(list_nodes_not_final)373 374 for route in list_routes:375 if route.start_node == node and route.finish_node == node:376 list_route_node_auto.append(route)377 if route.start_node == node and route.finish_node != node:378 list_route_node_output.append(route) 379 if route.finish_node == node and route.start_node != node:380 list_route_node_input.append(route)381 382 if len(list_route_node_output) > 1: 383 groups = groupby(list_route_node_output, attrgetter('finish_node')) #o(n)384 for (key, data) in groups:385 routes = list(data)386 if len(routes) > 1:387 new_labels_rel = set()388 for route in routes:389 if len(route.label_rel) > 2:390 if route.label_rel[0] == '(' and route.label_rel[-1] == ')':391 route.label_rel = route.label_rel[1:-1]392 for label_t in route.set_label_rel:393 new_labels_rel.add(label_t)394 list_routes.remove(route)395 list_route_node_output.remove(route)396 397 new_label_rel = '|'.join(new_labels_rel)398 if len(new_labels_rel) > 1:399 new_route = Route(node, key, label_rel = new_label_rel)400 else:401 new_route = Route(node, key, label_rel = new_label_rel)402 #print(new_route.label_rel)403 new_route.set_label_rel = new_labels_rel404 405 list_routes.append(new_route)406 list_route_node_output.append(new_route)407 if len(list_route_node_input) == 1 and len(list_route_node_output) == 1 and len(list_route_node_auto) == 0:408 route_in = list_route_node_input[0]409 route_out = list_route_node_output[0]410 new_label, set_label = clear_label(route_in.label_rel, route_out.label_rel, set_label1=route_in.set_label_rel, set_label2=route_out.set_label_rel)411 412 new_route = Route(route_in.start_node, route_out.finish_node , label_rel = new_label)413 new_route.set_label_rel = set_label414 415 list_routes.append(new_route)416 list_routes.remove(route_in)417 list_routes.remove(route_out)418 list_route_node_input.remove(route_in)419 list_route_node_output.remove(route_out)420 421 422 if len(list_route_node_auto) > 0:423 for auto_route in list_route_node_auto:424 for input_route in list_route_node_input:425 for output_route in list_route_node_output:426 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, autolabel=auto_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel, set_auto=auto_route.set_label_rel)427 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label) 428 429 new_route.set_label_rel = set_label430 list_routes.append(new_route)431 list_routes.remove(input_route) 432 list_routes.remove(auto_route)433 434 for output_route in list_route_node_output:435 list_routes.remove(output_route)436 else:437 for input_route in list_route_node_input:438 for output_route in list_route_node_output:439 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel)440 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label)441 new_route.set_label_rel = set_label442 list_routes.append(new_route)443 list_routes.remove(input_route)444 for output_route in list_route_node_output:445 list_routes.remove(output_route)446 list_nodes_not_final.remove(node)447 448 #### QUA####449 while len(list_final_output_nodes) > 0:450 list_route_node_auto = []451 list_route_node_input = []452 list_route_node_output = []453 node = random.choice(list_final_output_nodes)454 #print("node id", node.id)455 '''for route in list_routes:456 print("route", route.start_node.id, route.finish_node.id, route.label_rel)457 '''458 for route in list_routes:459 if route.start_node == node and route.finish_node == node:460 list_route_node_auto.append(route)461 if route.start_node == node and route.finish_node != node:462 list_route_node_output.append(route) 463 if route.finish_node == node and route.start_node != node:464 list_route_node_input.append(route)465 466 if len(list_route_node_input) == 1 and len(list_route_node_output) == 1 and len(list_route_node_auto) == 0:467 route_in = list_route_node_input[0]468 route_out = list_route_node_output[0]469 new_label, set_label = clear_label(route_in.label_rel, route_out.label_rel, set_label1=route_in.set_label_rel, set_label2=route_out.set_label_rel)470 471 new_route = Route(route_in.start_node, route_out.finish_node , label_rel = new_label)472 new_route.set_label_rel = set_label473 474 if route_out.rif_node != None:475 new_route.rif_node = route_out.rif_node476 elif route_out.finish_node == final_node:477 new_route.rif_node = node478 479 list_routes.append(new_route)480 481 list_routes.remove(route_in)482 list_routes.remove(route_out)483 list_route_node_input.remove(route_in)484 list_route_node_output.remove(route_out)485 486 if len(list_route_node_auto) > 0:487 remove = False488 for auto_route in list_route_node_auto:489 for input_route in list_route_node_input:490 remove = True491 for output_route in list_route_node_output:492 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, autolabel=auto_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel, set_auto=auto_route.set_label_rel)493 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label) 494 495 new_route.set_label_rel = set_label496 list_routes.append(new_route)497 if output_route.rif_node != None:498 new_route.rif_node = output_route.rif_node499 elif output_route.finish_node == final_node:500 new_route.rif_node = node501 list_routes.remove(input_route) 502 list_routes.remove(auto_route)503 504 if remove:505 for output_route in list_route_node_output:506 list_routes.remove(output_route)507 if len(list_route_node_input) == 0:508 for output_route in list_route_node_output: 509 if output_route.rif_node != None:510 output_route.rif_node = output_route.rif_node511 elif output_route.finish_node == final_node:512 output_route.rif_node = node 513 else:514 remove = False515 for input_route in list_route_node_input:516 remove = True517 #print("qua", input_route.label_rel)518 for output_route in list_route_node_output:519 if input_route.start_node.id == -1 and output_route.finish_node.id == -99:520 #print("rimosso")521 continue522 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel)523 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label)524 new_route.set_label_rel = set_label525 #print("nuova rotta", new_route.label_rel, new_route.start_node.id, new_route.finish_node.id)526 list_routes.append(new_route)527 528 if output_route.rif_node != None:529 new_route.rif_node = output_route.rif_node530 elif output_route.finish_node == final_node:531 new_route.rif_node = node532 list_routes.remove(input_route)533 if remove:534 for output_route in list_route_node_output:535 list_routes.remove(output_route)536 if len(list_route_node_input) == 0:537 for output_route in list_route_node_output: 538 if output_route.rif_node != None:539 output_route.rif_node = output_route.rif_node540 elif output_route.finish_node == final_node:541 output_route.rif_node = node542 list_final_output_nodes.remove(node)543 '''for route in list_routes:544 print(closing_space.id, route.label_rel, route.rif_node.id)'''545 already_done = []546 routes = []547 for route in list_routes:548 if not route.rif_node.id in already_done:549 labels = set()550 gen = (r for r in list_routes if r.rif_node.id == route.rif_node.id)551 for r in gen:552 labels.add(r.label_rel)553 554 route.label_rel = "|".join(labels)555 route.set_label_rel = labels556 routes.append(route)557 already_done.append(route.rif_node.id)558 return routes559def clear_label(label1, label2, autolabel = None, set_label1 = None, set_label2 = None, set_auto = None):560 new_label = ''561 new_label_set = {new_label}562 #print("clear_label", label1, label2, set_label1, set_label2, autolabel, set_auto)563 if autolabel and autolabel != '\u03b5':564 autolabel = "(" + autolabel + ")"565 autolabel, set_auto = clear_label(autolabel, '*', set_label1={autolabel}, set_label2={'*'})566 if label1 == '\u03b5' and label2 == '\u03b5':567 if autolabel:568 if autolabel == '\u03b5':569 new_label = '\u03b5'570 else:571 new_label = autolabel572 new_label_set = set_auto573 else:574 new_label = '\u03b5'575 new_label_set = {new_label}576 elif label1 == '\u03b5':577 if autolabel:578 if autolabel == '\u03b5':579 new_label = label2580 new_label_set = set_label2581 else:582 new_label, new_label_set = clear_label(autolabel, label2, set_label1=set_auto, set_label2=set_label2)583 else:584 new_label = label2585 new_label_set = set_label2586 elif label2 == '\u03b5':587 if autolabel:588 if autolabel == '\u03b5':589 new_label = label1590 new_label_set = set_label1591 else:592 new_label, new_label_set = clear_label(label1, autolabel, set_label1=set_label1, set_label2=set_auto)593 else:594 new_label = label1595 new_label_set = set_label1596 else:597 if autolabel:598 if autolabel == '\u03b5':599 new_label, new_label_set = clear_label(label1, label2, set_label1=set_label1, set_label2=set_label2)600 else:601 new_label, new_label_set = clear_label(label1, autolabel, set_label1=set_label1, set_label2=set_auto)602 new_label, new_label_set = clear_label(new_label, label2, set_label1=new_label_set, set_label2=set_label2)603 elif set_label1:604 labels = set()605 #new_label_set = set()606 for label_temp in set_label1:607 for label_temp2 in set_label2:608 new_label_temp, new_label_set_temp = clear_label(label_temp, label_temp2)609 labels.add(new_label_temp)610 611 new_label = '|'.join(labels)612 new_label_set = labels613 614 else:615 616 new_label = label1 + label2617 new_label_set = {label1, label2}618 619 #print("sono qua",new_label, new_label_set )620 return new_label, new_label_set621def linear_diagnostic(diagnosticator_space, osservation):622 X = []623 X.append((diagnosticator_space.initial_state.id,'\u03b5'))624 X_final = []625 X_final += X626 list_states = diagnosticator_space.list_states627 for o in osservation:628 Xnew = []629 #print("osservazione", o)630 for (state_id, p) in X:631 groups = groupby(list_states[state_id].list_routes, attrgetter('finish_state')) #o(n)632 #print(state_id)633 #print("STATO", state_id, list_states[state_id].delta)634 for (key, data) in groups:635 #print(key.id)636 routes = list(data)637 for route in routes:638 if route.label_oss == o:639 p_label_set = set(p.split("|"))640 route_label_set = set(route.label_rel.split("|"))641 new_p, temp = clear_label(p, route.label_rel, set_label1 = p_label_set, set_label2 = route_label_set)642 #print("\t", new_p, p, route.label_rel, route.set_label_rel, )643 add = True644 for e_tuple_temp in Xnew:645 if e_tuple_temp[0] == route.finish_state.id and sorted(e_tuple_temp[1]) != sorted(new_p):646 #c'è un errore647 lst = list(e_tuple_temp)648 add = False649 label_set = set()650 label_set.add(e_tuple_temp[1])651 label_set.add(new_p)652 lst[1] = '|'.join(label_set)653 e_tuple_temp = tuple(lst)654 #print("\tdio", lst[1])655 break656 if add:657 #print("stato aggiunto", route.finish_state.id)658 Xnew.append((route.finish_state.id, new_p))659 #print("\t\t", add)660 X = copy.deepcopy(Xnew)661 X_final += Xnew662 #print(X_final)663 final_diagnosis = []664 labels_to_return = set()665 for e in Xnew:666 state = list_states[e[0]]667 if state.delta != '':668 list_delta = set(state.delta.split('|'))669 p_label_set = set(e[1].split("|"))670 p = "|".join(p_label_set)671 delta = "|".join(list_delta)672 label, temp = clear_label(p, delta, set_label1=p_label_set, set_label2=list_delta)673 labels_to_return.add(frozenset(temp))674 for frozen in labels_to_return:675 final_diagnosis.append('|'.join(list(frozen)))676 #print(final_diagnosis)677 return final_diagnosis678 ...
test_v_s_route.py
Source:test_v_s_route.py
...135 assert_event_and_count(vsr_1_event_text, initial_count_vsr_1 + 4, events_ns_1)136 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 4, events_ns_1)137 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 4, events_ns_2)138 print("\nStep 6: remove VSRoute and check")139 delete_v_s_route(kube_apis.custom_objects, v_s_route_setup.route_m.name, v_s_route_setup.namespace)140 wait_before_test(1)141 new_config = get_vs_nginx_template_conf(kube_apis.v1,142 v_s_route_setup.namespace,143 v_s_route_setup.vs_name,144 ic_pod_name,145 ingress_controller_prerequisites.namespace)146 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",147 headers={"host": v_s_route_setup.vs_host})148 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",149 headers={"host": v_s_route_setup.vs_host})150 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",151 headers={"host": v_s_route_setup.vs_host})152 assert resp_1.status_code == 404153 assert resp_2.status_code == 404154 assert resp_3.status_code == 200155 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)156 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)157 assert_locations_not_in_config(new_config, v_s_route_setup.route_m.paths)158 assert_event_and_count(vsr_1_event_text, initial_count_vsr_1 + 4, events_ns_1)159 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 4, events_ns_1)160 # a warning event because the VS references a non-existing VSR161 assert_event_with_full_equality_and_count(vs_warning_event_text, 1, events_ns_1)162 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 5, events_ns_2)163 print("\nStep 7: restore VSRoute and check")164 create_v_s_route_from_yaml(kube_apis.custom_objects,165 f"{TEST_DATA}/virtual-server-route/route-multiple.yaml",166 v_s_route_setup.namespace)167 wait_before_test(1)168 new_config = get_vs_nginx_template_conf(kube_apis.v1,169 v_s_route_setup.namespace,170 v_s_route_setup.vs_name,171 ic_pod_name,172 ingress_controller_prerequisites.namespace)173 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",174 headers={"host": v_s_route_setup.vs_host})175 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",176 headers={"host": v_s_route_setup.vs_host})177 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",178 headers={"host": v_s_route_setup.vs_host})179 assert_responses_and_server_name(resp_1, resp_2, resp_3)180 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)181 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)182 assert_locations_in_config(new_config, v_s_route_setup.route_m.paths)183 assert_event_and_count(vsr_1_event_text, 1, events_ns_1)184 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 5, events_ns_1)185 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 6, events_ns_2)186 print("\nStep 8: remove one backend service and check")187 delete_service(kube_apis.v1, "backend1-svc", v_s_route_setup.route_m.namespace)188 wait_before_test(1)189 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",190 headers={"host": v_s_route_setup.vs_host})191 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",192 headers={"host": v_s_route_setup.vs_host})193 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",194 headers={"host": v_s_route_setup.vs_host})195 assert resp_1.status_code == 502196 assert resp_2.status_code == 200197 assert resp_3.status_code == 200198 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)199 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)200 assert_event_and_count(vsr_1_event_text, 2, events_ns_1)201 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 6, events_ns_1)202 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 7, events_ns_2)203 print("\nStep 9: restore backend service and check")204 create_service_with_name(kube_apis.v1, v_s_route_setup.route_m.namespace, "backend1-svc")205 wait_before_test(1)206 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",207 headers={"host": v_s_route_setup.vs_host})208 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",209 headers={"host": v_s_route_setup.vs_host})210 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",211 headers={"host": v_s_route_setup.vs_host})212 assert_responses_and_server_name(resp_1, resp_2, resp_3)213 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)214 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)215 assert_event_and_count(vsr_1_event_text, 3, events_ns_1)216 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 7, events_ns_1)217 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 8, events_ns_2)218 print("\nStep 10: remove VS and check")219 delete_virtual_server(kube_apis.custom_objects, v_s_route_setup.vs_name, v_s_route_setup.namespace)220 wait_before_test(1)221 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",222 headers={"host": v_s_route_setup.vs_host})223 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",224 headers={"host": v_s_route_setup.vs_host})225 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",226 headers={"host": v_s_route_setup.vs_host})227 assert resp_1.status_code == 404228 assert resp_2.status_code == 404229 assert resp_3.status_code == 404230 list0_list_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)231 list0_list_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)232 assert_event_and_count(vsr_1_event_text, 3, list0_list_ns_1)233 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 7, list0_list_ns_1)234 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 8, list0_list_ns_2)235 print("\nStep 11: restore VS and check")236 create_virtual_server_from_yaml(kube_apis.custom_objects,237 f"{TEST_DATA}/virtual-server-route/standard/virtual-server.yaml",238 v_s_route_setup.namespace)239 wait_before_test(1)240 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",241 headers={"host": v_s_route_setup.vs_host})242 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",243 headers={"host": v_s_route_setup.vs_host})244 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",245 headers={"host": v_s_route_setup.vs_host})246 assert_responses_and_server_name(resp_1, resp_2, resp_3)247 list1_list_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)248 list1_list_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)249 assert_event_and_count(vsr_1_event_text, 4, list1_list_ns_1)250 assert_event_with_full_equality_and_count(vs_event_text, 1, list1_list_ns_1)251 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 9, list1_list_ns_2)252@pytest.mark.vsr253@pytest.mark.parametrize('crd_ingress_controller, v_s_route_setup',254 [({"type": "complete", "extra_args": [f"-enable-custom-resources"]},255 {"example": "virtual-server-route"})],256 indirect=True)257class TestVirtualServerRouteValidation:258 def test_vsr_without_vs(self, kube_apis,259 ingress_controller_prerequisites,260 crd_ingress_controller,261 v_s_route_setup,262 test_namespace):263 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)264 vsr_name = create_v_s_route_from_yaml(kube_apis.custom_objects,265 f"{TEST_DATA}/virtual-server-route/route-orphan.yaml",266 test_namespace)267 vsr_paths = get_paths_from_vsr_yaml(f"{TEST_DATA}/virtual-server-route/route-orphan.yaml")268 wait_before_test(1)269 new_config = get_vs_nginx_template_conf(kube_apis.v1,270 v_s_route_setup.namespace,271 v_s_route_setup.vs_name,272 ic_pod_name,273 ingress_controller_prerequisites.namespace)274 new_list_ns_3 = get_events(kube_apis.v1, test_namespace)275 assert_locations_not_in_config(new_config, vsr_paths)276 assert_event_and_count(f"VirtualServer {v_s_route_setup.namespace}/{v_s_route_setup.vs_name} ignores VirtualServerRoute", 1, new_list_ns_3)277 @pytest.mark.parametrize("route_yaml", [f"{TEST_DATA}/virtual-server-route/route-single-invalid-host.yaml",278 f"{TEST_DATA}/virtual-server-route/route-single-duplicate-path.yaml"])279 def test_make_existing_vsr_invalid(self, kube_apis,280 ingress_controller_prerequisites,281 crd_ingress_controller,282 v_s_route_setup,283 route_yaml):284 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)285 patch_v_s_route_from_yaml(kube_apis.custom_objects,286 v_s_route_setup.route_s.name,287 route_yaml,288 v_s_route_setup.route_s.namespace)289 wait_before_test(1)290 new_config = get_vs_nginx_template_conf(kube_apis.v1,291 v_s_route_setup.namespace,292 v_s_route_setup.vs_name,293 ic_pod_name,294 ingress_controller_prerequisites.namespace)295 new_vs_events = get_events(kube_apis.v1, v_s_route_setup.namespace)296 new_vsr_events = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)297 assert_locations_not_in_config(new_config, v_s_route_setup.route_s.paths)298 text = f"{v_s_route_setup.route_s.namespace}/{v_s_route_setup.route_s.name}"299 assert_event_and_count(f"Configuration for {v_s_route_setup.namespace}/{v_s_route_setup.vs_name} was added or updated with warning(s)",300 1,301 new_vs_events)302 if route_yaml == f"{TEST_DATA}/virtual-server-route/route-single-invalid-host.yaml":303 assert_event_and_count(f"VirtualServer is invalid or doesn't exist",304 1,305 new_vsr_events)306 else:307 assert_event_and_count(f"VirtualServerRoute {text} was rejected with error",308 1,309 new_vsr_events)310 def test_openapi_validation_flow(self, kube_apis, ingress_controller_prerequisites,311 crd_ingress_controller, v_s_route_setup):312 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)313 config_old = get_vs_nginx_template_conf(kube_apis.v1,314 v_s_route_setup.namespace,315 v_s_route_setup.vs_name,316 ic_pod_name,317 ingress_controller_prerequisites.namespace)318 route_yaml = f"{TEST_DATA}/virtual-server-route/route-single-invalid-openapi.yaml"319 try:320 patch_v_s_route_from_yaml(kube_apis.custom_objects,321 v_s_route_setup.route_s.name,322 route_yaml,323 v_s_route_setup.route_s.namespace)324 except ApiException as ex:325 assert ex.status == 422 and "spec.subroutes.action.pass" in ex.body326 except Exception as ex:327 pytest.fail(f"An unexpected exception is raised: {ex}")328 else:329 pytest.fail("Expected an exception but there was none")330 wait_before_test(1)331 config_new = get_vs_nginx_template_conf(kube_apis.v1,332 v_s_route_setup.namespace,333 v_s_route_setup.vs_name,334 ic_pod_name,335 ingress_controller_prerequisites.namespace)336 assert config_old == config_new, "Expected: config doesn't change"337@pytest.mark.vsr338@pytest.mark.parametrize('crd_ingress_controller, v_s_route_setup',339 [({"type": "complete", "extra_args": [f"-enable-custom-resources"]},340 {"example": "virtual-server-route"})],341 indirect=True)342class TestCreateInvalidVirtualServerRoute:343 def test_create_invalid_vsr(self, kube_apis,344 ingress_controller_prerequisites,345 crd_ingress_controller,346 v_s_route_setup):347 route_yaml = f"{TEST_DATA}/virtual-server-route/route-single-duplicate-path.yaml"348 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)349 text = f"{v_s_route_setup.route_s.namespace}/{v_s_route_setup.route_s.name}"350 vs_event_text = f"Configuration for {v_s_route_setup.namespace}/{v_s_route_setup.vs_name} was added or updated with warning(s)"351 vsr_event_text = f"VirtualServerRoute {text} was rejected with error: spec.subroutes[1].path: Duplicate value: \"/backend2\""352 delete_v_s_route(kube_apis.custom_objects,353 v_s_route_setup.route_s.name,354 v_s_route_setup.route_s.namespace)355 create_v_s_route_from_yaml(kube_apis.custom_objects,356 route_yaml,357 v_s_route_setup.route_s.namespace)358 wait_before_test(1)359 new_config = get_vs_nginx_template_conf(kube_apis.v1,360 v_s_route_setup.namespace,361 v_s_route_setup.vs_name,362 ic_pod_name,363 ingress_controller_prerequisites.namespace)364 new_vs_events = get_events(kube_apis.v1, v_s_route_setup.namespace)365 new_vsr_events = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)366 assert_locations_not_in_config(new_config, v_s_route_setup.route_s.paths)...
test_bgpspeaker.py
Source:test_bgpspeaker.py
...287 @mock.patch(288 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',289 mock.MagicMock(return_value=None))290 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')291 def test_evpn_prefix_add_ip_prefix_route(self, mock_call):292 # Prepare test data293 route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE294 route_dist = '65000:100'295 esi = 0 # denotes single-homed296 ethernet_tag_id = 200297 ip_prefix = '192.168.0.0/24'298 gw_ip_addr = '172.16.0.1'299 next_hop = '0.0.0.0'300 expected_kwargs = {301 'route_type': route_type,302 'route_dist': route_dist,303 'esi': esi,304 'ethernet_tag_id': ethernet_tag_id,305 'ip_prefix': ip_prefix,306 'gw_ip_addr': gw_ip_addr,307 'next_hop': next_hop,308 }309 # Test310 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')311 speaker.evpn_prefix_add(312 route_type=route_type,313 route_dist=route_dist,314 esi=esi,315 ethernet_tag_id=ethernet_tag_id,316 ip_prefix=ip_prefix,317 gw_ip_addr=gw_ip_addr,318 )319 # Check320 mock_call.assert_called_with(321 'evpn_prefix.add_local', **expected_kwargs)322 @mock.patch(323 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',324 mock.MagicMock(return_value=None))325 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')326 def test_evpn_prefix_add_ip_prefix_route_vni(self, mock_call):327 # Prepare test data328 route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE329 route_dist = '65000:100'330 esi = 0 # denotes single-homed331 ethernet_tag_id = 200332 ip_prefix = '192.168.0.0/24'333 gw_ip_addr = '172.16.0.1'334 vni = 500335 tunnel_type = bgpspeaker.TUNNEL_TYPE_VXLAN336 next_hop = '0.0.0.0'337 expected_kwargs = {338 'route_type': route_type,339 'route_dist': route_dist,340 'esi': esi,341 'ethernet_tag_id': ethernet_tag_id,342 'ip_prefix': ip_prefix,343 'gw_ip_addr': gw_ip_addr,344 'tunnel_type': tunnel_type,345 'vni': vni,346 'next_hop': next_hop,347 }348 # Test349 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')350 speaker.evpn_prefix_add(351 route_type=route_type,352 route_dist=route_dist,353 esi=esi,354 ethernet_tag_id=ethernet_tag_id,355 ip_prefix=ip_prefix,356 gw_ip_addr=gw_ip_addr,357 tunnel_type=tunnel_type,358 vni=vni,359 )360 # Check361 mock_call.assert_called_with(362 'evpn_prefix.add_local', **expected_kwargs)363 @raises(ValueError)364 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',365 mock.MagicMock(return_value=None))366 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')367 def test_evpn_prefix_add_invalid_route_type(self, mock_call):368 # Prepare test data369 route_type = 'foobar' # Invalid EVPN route type370 route_dist = '65000:100'371 esi = 0 # denotes single-homed372 ethernet_tag_id = 200373 mac_addr = 'aa:bb:cc:dd:ee:ff'374 ip_addr = '192.168.0.1'375 next_hop = '10.0.0.1'376 # Test377 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')378 speaker.evpn_prefix_add(379 route_type=route_type,380 route_dist=route_dist,381 esi=esi,382 ethernet_tag_id=ethernet_tag_id,383 mac_addr=mac_addr,384 ip_addr=ip_addr,385 next_hop=next_hop,386 )387 # Check388 mock_call.assert_called_with(389 'evpn_prefix.add_local', 'Invalid arguments detected')390 @mock.patch(391 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',392 mock.MagicMock(return_value=None))393 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')394 def test_evpn_prefix_del_auto_discovery(self, mock_call):395 # Prepare test data396 route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY397 route_dist = '65000:100'398 esi = {399 'type': ESI_TYPE_LACP,400 'mac_addr': 'aa:bb:cc:dd:ee:ff',401 'port_key': 100,402 }403 ethernet_tag_id = EVPN_MAX_ET404 expected_kwargs = {405 'route_type': route_type,406 'route_dist': route_dist,407 'esi': esi,408 'ethernet_tag_id': ethernet_tag_id,409 }410 # Test411 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')412 speaker.evpn_prefix_del(413 route_type=route_type,414 route_dist=route_dist,415 esi=esi,416 ethernet_tag_id=ethernet_tag_id,417 )418 # Check419 mock_call.assert_called_with(420 'evpn_prefix.delete_local', **expected_kwargs)421 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',422 mock.MagicMock(return_value=None))423 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')424 def test_evpn_prefix_del_mac_ip_adv(self, mock_call):425 # Prepare test data426 route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE427 route_dist = '65000:100'428 ethernet_tag_id = 200429 mac_addr = 'aa:bb:cc:dd:ee:ff'430 ip_addr = '192.168.0.1'431 expected_kwargs = {432 'route_type': route_type,433 'route_dist': route_dist,434 'ethernet_tag_id': ethernet_tag_id,435 'mac_addr': mac_addr,436 'ip_addr': ip_addr,437 }438 # Test439 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')440 speaker.evpn_prefix_del(441 route_type=route_type,442 route_dist=route_dist,443 ethernet_tag_id=ethernet_tag_id,444 mac_addr=mac_addr,445 ip_addr=ip_addr,446 )447 # Check448 mock_call.assert_called_with(449 'evpn_prefix.delete_local', **expected_kwargs)450 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',451 mock.MagicMock(return_value=None))452 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')453 def test_evpn_prefix_del_multicast_etag(self, mock_call):454 # Prepare test data455 route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE456 route_dist = '65000:100'457 esi = 0 # denotes single-homed458 ethernet_tag_id = 200459 mac_addr = 'aa:bb:cc:dd:ee:ff'460 ip_addr = '192.168.0.1'461 expected_kwargs = {462 'route_type': route_type,463 'route_dist': route_dist,464 # 'esi': esi, # should be ignored465 'ethernet_tag_id': ethernet_tag_id,466 # 'mac_addr': mac_addr, # should be ignored467 'ip_addr': ip_addr,468 }469 # Test470 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')471 speaker.evpn_prefix_del(472 route_type=route_type,473 route_dist=route_dist,474 esi=esi,475 ethernet_tag_id=ethernet_tag_id,476 mac_addr=mac_addr,477 ip_addr=ip_addr,478 )479 # Check480 mock_call.assert_called_with(481 'evpn_prefix.delete_local', **expected_kwargs)482 @raises(ValueError)483 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',484 mock.MagicMock(return_value=None))485 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')486 def test_evpn_prefix_del_invalid_route_type(self, mock_call):487 # Prepare test data488 route_type = 'foobar' # Invalid EVPN route type489 route_dist = '65000:100'490 esi = 0 # denotes single-homed491 ethernet_tag_id = 200492 mac_addr = 'aa:bb:cc:dd:ee:ff'493 ip_addr = '192.168.0.1'494 # Test495 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')496 speaker.evpn_prefix_del(497 route_type=route_type,498 route_dist=route_dist,499 esi=esi,500 ethernet_tag_id=ethernet_tag_id,501 mac_addr=mac_addr,502 ip_addr=ip_addr,503 )504 # Check505 mock_call.assert_called_with(506 'evpn_prefix.delete_local', 'Invalid arguments detected')507 @mock.patch(508 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',509 mock.MagicMock(return_value=None))510 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')511 def test_evpn_prefix_del_eth_segment(self, mock_call):512 # Prepare test data513 route_type = bgpspeaker.EVPN_ETH_SEGMENT514 route_dist = '65000:100'515 esi = {516 'esi_type': ESI_TYPE_MAC_BASED,517 'mac_addr': 'aa:bb:cc:dd:ee:ff',518 'local_disc': 100,519 }520 ip_addr = '192.168.0.1'521 expected_kwargs = {522 'route_type': route_type,523 'route_dist': route_dist,524 'esi': esi,525 'ip_addr': ip_addr,526 }527 # Test528 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')529 speaker.evpn_prefix_del(530 route_type=route_type,531 route_dist=route_dist,532 esi=esi,533 ip_addr=ip_addr,534 )535 # Check536 mock_call.assert_called_with(537 'evpn_prefix.delete_local', **expected_kwargs)538 @mock.patch(539 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',540 mock.MagicMock(return_value=None))541 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')542 def test_evpn_prefix_del_ip_prefix_route(self, mock_call):543 # Prepare test data544 route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE545 route_dist = '65000:100'546 ethernet_tag_id = 200547 ip_prefix = '192.168.0.0/24'548 expected_kwargs = {549 'route_type': route_type,550 'route_dist': route_dist,551 'ethernet_tag_id': ethernet_tag_id,552 'ip_prefix': ip_prefix,553 }554 # Test555 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')556 speaker.evpn_prefix_del(...
OverlapGraph.py
Source:OverlapGraph.py
1import os2import numpy as np3from tqdm import tqdm4import matplotlib5matplotlib.use('Agg')6import matplotlib.pyplot as plt7from PIL import Image8class OverlapGraph():9 # æå®ã®å½¢å¼ã«ãã©ããã³ã°çµæãæ´å½¢ãã¦åºåããã¯ã©ã¹10 def __init__(self, data_list, route_list):11 # ã³ã³ã¹ãã©ã¯ã¿12 # args:13 self.data_list = data_list14 self.route_list = route_list15 # maxrange : ã°ã©ãã®æç»ç¯å²(±[μm])16 self.maxrange = 317 # ãã®ä»18 self.all_node_list = []19 self.all_node_id = []20 self.devided_route = []21 self.colors = []22 def setMaxRange(self, value):23 self.maxrange = value24 def route_extractor(self, threshold):25 # ãã©ããã³ã°çµæããé¾å¤ã®ãã¬ã¼ã é£ç¶ãã¦ãã©ããã³ã°ããã¦ããçµè·¯ãæ½åº26 # æå¹çµè·¯ã®æ½åº27 effective_route = [28 route for route in self.route_list29 if route.route_length > threshold30 ]31 # å
¨Nodeã¯ã©ã¹ã®é£ç¶é
åãä½ã32 self.all_node_list = [node for data in self.data_list for node in data]33 self.all_node_id = [node.ID for node in self.all_node_list]34 # çµè·¯å§ç¹ããé¾å¤ã¾ã§ã®ãã¼ãIDã¨TIMEã®ãªã¹ããä½ã35 self.devided_route = [route.route[:threshold] for route in effective_route]36 self.devided_route_time = [37 [t for t in range(route.t, route.t+10)]38 for route in effective_route39 ]40 # ã©ã³ãã ã«è²ãæå¹çµè·¯ã®åæ°åçæï¼å¤0ï½255ã®ç¯å²ã§nè¡3åã®ã©ã³ãã ãªndarrayãçæï¼41 self.colors = np.random.rand(len(effective_route), 3)42 def figout(self, out_path, *, pix2m=1):43 # æ½åºããå
¨çµè·¯ãéããããã¦è¡¨ç¤ºããï¼44 print('\nRoute overlapping graph making now ...')45 # åºåå
ä½æ46 print('> Check output dirctory: \"' + out_path + '\"')47 if os.path.isdir(out_path) is False:48 print('> Output directory are not found.')49 os.makedirs(out_path, exist_ok=True)50 print('> New directory created!')51 else:52 print('> Already exist.')53 print('> Calculating ...')54 # æç»ç¨ã®åº§æ¨ãªã¹ããä½ã55 point_list = [56 [57 [self.all_node_list[ID].x, self.all_node_list[ID].y]58 for ID in route59 ] for route in self.devided_route60 ]61 # å§ç¹ã®åº§æ¨ã(0, 0)ã«ãã¦å
¨ä½ãå¹³è¡ç§»åãã62 moved_point_list = [63 [64 np.array(np.array(node) - np.array(route[0])).tolist()65 for node in route66 ] for route in point_list67 ]68 # æ°å¤ãå®éã®è·é¢ã«åããã69 real_scale_list = np.array(moved_point_list) * pix2m70 # ã°ã©ãã®çæ71 fig = plt.figure(figsize=(5, 5))72 ax = fig.add_subplot(1, 1, 1)73 for r_idx, route in enumerate(real_scale_list):74 for i in range(1, len(route)):75 parent_x = route[i-1][0]76 parent_y = -route[i-1][1]77 child_x = route[i][0]78 child_y = -route[i][1]79 # 注æï¼ç»åã®è¦ãæ¹ã¨å®éã®å¤ã®é½é½¬ã解æ¶ããããã«ï¼y軸ã¯ç¬¦å·å転ãã80 ax.plot(81 [parent_x, child_x], [parent_y, child_y],82 c=self.colors[r_idx].tolist())83 # å
¨çµè·¯ã®æ大å¤(+ä½ç½)ãæ±ãã¦ï¼ã°ã©ãæ ã®ãµã¤ãºã決ãã84 # max_range = np.absolute(real_scale_list).max() + pix2m85 # plt.xlim([-max_range, max_range])86 # plt.ylim([-max_range, max_range])87 # ã°ã©ãæ ã®ãµã¤ãºã3umã«è¨å®ãã88 w = self.maxrange89 plt.xlim([-w, w])90 plt.ylim([-w, w])91 # ã°ã©ãã®ä¿å92 filename = out_path + '/overlapped_route.png'93 plt.savefig(filename, dpi=100, transparent=True)94 plt.close()95 print('> Done.')96 return moved_point_list, real_scale_list97 def trajout(self, out_path, *, pix2m=1):98 # æ½åºããå
¨çµè·¯ãåå¥ã«ä¸¦ã¹ã¦åºåããï¼99 print('\nRoute trajectory graph making now ...')100 # åºåå
ä½æ101 out_path = out_path + 'traj_fig/'102 print('> Check output dirctory: \"'+ out_path + '\"')103 if os.path.isdir(out_path) is False:104 print('> Output directory are not found.')105 os.makedirs(out_path, exist_ok=True)106 print('> New directory created!')107 else:108 print('> Already exist.')109 print('> Calculating ...')110 # æç»ç¨ã®åº§æ¨ãªã¹ããä½ã111 point_list = [112 [113 [self.all_node_list[ID].x, self.all_node_list[ID].y]114 for ID in route115 ] for route in self.devided_route116 ]117 # å§ç¹ã®åº§æ¨ã(0, 0)ã«ãã¦å
¨ä½ãå¹³è¡ç§»åãã118 moved_point_list = [119 [120 np.array(np.array(node) - np.array(route[0])).tolist()121 for node in route122 ] for route in point_list123 ]124 # æ°å¤ãå®éã®è·é¢ã«åããã125 real_scale_list = np.array(moved_point_list) * pix2m126 # å
¨çµè·¯ã®æ大å¤(+ä½ç½)ãæ±ãã¦ï¼ã°ã©ãæ ã®ãµã¤ãºã決ãã127 max_range = np.absolute(real_scale_list).max() + pix2m128 # ããã°ã¬ã¹ãã¼ã®çæ129 with tqdm(total=len(real_scale_list), ascii=True, ncols=100) as pbar:130 for r_idx, route in enumerate(real_scale_list):131 # ããã°ã¬ã¹ãã¼ãé²ãã132 pbar.update(1)133 # ã°ã©ãã®çæ134 fig = plt.figure(figsize=(2, 2))135 ax = fig.add_subplot(1, 1, 1)136 for i in range(1, len(route)):137 parent_x = route[i-1][0]138 parent_y = -route[i-1][1]139 child_x = route[i][0]140 child_y = -route[i][1]141 # 注æï¼ç»åã®è¦ãæ¹ã¨å®éã®å¤ã®é½é½¬ã解æ¶ããããã«ï¼y軸ã¯ç¬¦å·å転ãã142 ax.plot(143 [parent_x, child_x], [parent_y, child_y],144 c=self.colors[r_idx].tolist())145 # ã°ã©ãã®æç»ç¯å²ã®è¨å®146 plt.xlim([-max_range, max_range])147 plt.ylim([-max_range, max_range])148 # ã°ã©ãæ ã®ãµã¤ãºã3umã«è¨å®ãã149 # w = self.maxrange150 # plt.xlim([-w, w])151 # plt.ylim([-w, w])152 # ã°ã©ãã®ä¿å153 filename = out_path + '/route_%05d.png' % r_idx154 plt.savefig(filename, dpi=100, transparent=True)155 plt.close()156 print('> Done.')157 def movout(self, out_path, bg_path):158 # ãã©ããã³ã°çµæããªã¼ãã¼ã¬ã¤æç»ãã¦åºåãã159 print('\nRoute trajectory movie making now ...')160 # åºåå
ä½æ161 out_path = out_path + 'mov_fig/'162 print('> Check output dirctory: \"' + out_path + '\"')163 if os.path.isdir(out_path) is False:164 print('> Output directory are not found.')165 os.makedirs(out_path, exist_ok=True)166 print('> New directory created!')167 else:168 print('> Already exist.')169 print('> Calculating ...')170 # ããã°ã¬ã¹ãã¼ã®çæ171 with tqdm(total=len(self.data_list), ascii=True, ncols=100) as pbar:172 # ãã¬ã¼ã ãã¨ã«ã«ã¼ãå¦çãã173 for idx, nodes in enumerate(self.data_list):174 # ããã°ã¬ã¹ãã¼ãé²ãã175 pbar.update(1)176 # èæ¯ã®èªã¿è¾¼ã¿177 img = Image.open(bg_path % idx)178 img = np.array(img)179 h = img.shape[0]180 w = img.shape[1]181 # ã°ã©ãã®çæ182 fig = plt.figure(figsize=(w/10, h/10))183 ax = fig.add_subplot(1, 1, 1)184 # ç»åã®ã«ã©ã¼/ã°ã¬ã¼ã¹ã±ã¼ã«å¤å®185 if len(img.shape) is 2:186 ax.imshow(img, 'gray')187 print('> input image is gray scale.')188 else:189 ax.imshow(img)190 # print('> input image is RGB color')191 # æç»ç¨ã®åº§æ¨ãªã¹ããä½ã192 for r_idx, (T, route) in enumerate(zip(self.devided_route_time, self.devided_route)):193 if idx in T:194 index = T.index(idx)195 for i in reversed(range(index+1)):196 ID = route[i]197 node = self.all_node_list[ID]198 if node.parent_ID is not None:199 parent = self.all_node_list[node.parent_ID]200 child_x = node.x201 child_y = node.y202 parent_x = parent.x203 parent_y = parent.y204 ax.plot(205 [parent_x, child_x], [parent_y, child_y],206 c=self.colors[r_idx].tolist()207 )208 # ãã¼ãã®æç»209 for i, node in enumerate(nodes):210 ax.scatter(node.x, node.y, c='red', s=200, marker='*', edgecolors='black', linewidths=1)211 # ç¯å²è¨å®212 plt.xlim([0, w-1])213 plt.ylim([h-1, 0])214 filename = out_path + '%05d.tif' % idx215 plt.savefig(filename, dpi=100)216 plt.close()...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!