How to use route method in localstack

Best Python code snippet using localstack_python

extra_function.py

Source:extra_function.py Github

copy

Full Screen

1from user_class import *2import copy3import random4from memory_profiler import profile5import json6import argparse7def parse_arguments():8 9 parser = argparse.ArgumentParser()10 subparser = parser.add_mutually_exclusive_group(required=True)11 subparser.add_argument('-net', metavar='net.json', help='net file')12 subparser.add_argument('-b', metavar='behavior_space.json', help='behavior space')13 subparser.add_argument('-o', metavar='oss_space.json', help='behavior space from osservation')14 subparser.add_argument('-cs', metavar='closing_space.json', help='silence closing space')15 subparser.add_argument('-dgn', metavar='diagnosticator.json', help='diagnosticator')16 17 subparser2 = parser.add_mutually_exclusive_group(required=False)18 subparser2.add_argument("-all", help='run all program', action='store_const', const='all', dest='step')19 subparser2.add_argument('-gb', help='generate behavior space', action='store_const', const='gb', dest='step')20 21 subparser2.add_argument("-go", help='generate behavior space from osservation', action='store_const', const='gbo', dest='step')22 23 subparser2.add_argument("-d", help='diagnosis behavior space', action='store_const', const='d', dest='step')24 subparser2.add_argument("-gcs", help='generate closing space', action='store_const', const='gcs', dest='step')25 26 #subparser2.add_argument("-dc", help='diagnosis closing space', action='store_const', const='dc', dest='step')27 subparser2.add_argument("-gd", help='generate diagnosticator', action='store_const', const='gd', dest='step')28 subparser2.add_argument("-do", help='linear_diagnostic', action='store_const', const='do', dest='step')29 30 parser.add_argument("-ol2", help='osservation list for diagnosticator', required=False, nargs='*')31 parser.add_argument("-ol", help='osservation list', required=False, nargs='*')32 33 parser.add_argument("-view", help='view file', required=False)34 parser.add_argument("-bk", help='view file', action='store_true', required=False) 35 36 return parser.parse_args()37def json_to_obj(path):38 39 with open(path) as f:40 data = json.load(f)41 list_FA = []42 list_link = []43 for link in data["list_link"]:44 LINK = Link(link["start_FA"], link["finish_FA"], link["alias"])45 list_link.append(LINK)46 for fa in data["list_fa"]:47 48 list_states = []49 for state in fa["list_states"]:50 state_alias = state["state_alias"]51 state_obj = State(alias=state_alias)52 list_states.append(state_obj)53 54 list_trans = []55 for trans in fa["list_trans"]:56 alias = trans["alias"]57 state_in_alias = trans["state_in"]58 state_out_alias = trans["state_out"]59 label_oss = trans["label_oss"] if trans["label_oss"] else '\u03b5'60 label_rel = trans["label_rel"] if trans["label_rel"] else '\u03b5'61 input_event = trans["input_event"]62 output_event = trans["output_event"]63 state_in = None64 state_out = None65 for state in list_states:66 if state_in and state_out:67 break68 if state.alias == state_in_alias:69 state_in = state70 if state.alias == state_out_alias:71 state_out = state72 73 input_event_func = None74 if input_event:75 event = Event(input_event["event"])76 link_alias = input_event["link"]77 link = None78 for link_temp in list_link:79 if link_temp.alias == link_alias:80 link = link_temp81 break82 83 input_event_func = EventFunction(event,link)84 85 output_events_func = []86 for out_ev in output_event:87 event = Event(out_ev["event"])88 link_alias = out_ev["link"]89 link = None90 for link_temp in list_link:91 if link_temp.alias == link_alias:92 link = link_temp93 break94 95 out_temp = EventFunction(event,link)96 output_events_func.append(out_temp)97 trans_obj = Transition(state_in, state_out, input_event_func = input_event_func, output_events_func = output_events_func, label_oss = label_oss, label_rel = label_rel, alias = alias)98 list_trans.append(trans_obj)99 100 101 initial_state_alias = fa["initial_state"]102 actual_state_alias = fa["actual_state"]103 initial_state = None104 actual_state = None105 for state in list_states:106 if initial_state and actual_state:107 break108 if state.alias == initial_state_alias:109 initial_state = state110 if state.alias == actual_state_alias:111 actual_state = state112 alias = fa["FA_alias"]113 FA_temp = FA(list_states, list_trans, initial_state, actual_state, alias = alias)114 list_FA.append(FA_temp)115 net = Net(list_FA, list_link)116 '''for f in net.list_FA:117 print(f.alias)118 for state in f.list_states:119 print(state.alias)120 for trans in f.list_trans:121 print(trans.alias, trans.start_state.alias, trans.finish_state.alias, trans.label_oss)122 print("input", trans.input_event_func)'''123 124 '''with open("net.json", "w") as f:125 f.write(net)'''126 127 return net128def format_net_to_text(doc, text_to_file):129 text_to_file += "NET with " + str(len(doc.list_FA)) + " FAs and " + str(len(doc.list_link)) + " LINKS\n"130 text_to_file += ""131 for f in doc.list_FA:132 text_to_file += "\tFA: " + f.alias + " with " + str(len(f.list_states)) + " states and " + str(len(f.list_trans))+" trans\n"133 134 text_to_file += "\t\t STATES: "135 136 for state in f.list_states:137 text_to_file += state.alias + " "138 text_to_file += "\n"139 text_to_file += "\t\t TRANSITION: "140 for trans in f.list_trans:141 text_to_file += trans.alias + " "142 text_to_file += "\n"143 return text_to_file144def BFS(list_routes, list_space_nodes, final_node):145 grey = []146 good_nodes = set([])147 grey.append(final_node)148 q = []149 q.append(final_node)150 while q:151 node = q.pop(0)152 for route in list_routes:153 if route.finish_node == node:154 if route.start_node not in grey and route.start_node not in good_nodes:155 q.append(route.start_node)156 grey.append(route.start_node)157 good_nodes.add(node)158 grey.remove(node)159 return list_space_nodes - good_nodes160def change_state(transition, index_FA, state_alias, list_link, node, osservation = None):161 new_list_values_link = node.list_values_link.copy()162 new_list_states_FA = node.list_states_FA.copy()163 index_oss = node.index_oss164 if osservation != None:165 if transition.label_oss in osservation and index_oss < len(osservation):166 if transition.label_oss != osservation[index_oss]:167 return None168 else:169 index_oss += 1170 else:171 if transition.label_oss != '\u03b5':172 return None173 func = transition.input_event_func174 if func is not None:175 func_link = func.link176 func_event = func.event177 index_link = list_link.index(func_link)178 if new_list_values_link[index_link] != func_event.alias:179 return None180 else:181 new_list_values_link[index_link] = ''182 funcs = transition.output_events_func183 for func in funcs:184 func_link = func.link185 func_event = func.event186 index_link = list_link.index(func_link)187 if new_list_values_link[index_link] != '':188 return None189 else:190 new_list_values_link[index_link] = func_event.alias191 new_state_alias = transition.finish_state.alias192 new_list_states_FA[index_FA] = new_state_alias193 return Node(list_states_FA = new_list_states_FA, list_values_link = new_list_values_link, index_oss = index_oss)194from operator import attrgetter195from itertools import groupby196def reg_expr(oss_space):197 list_routes = oss_space.list_routes198 list_nodes = oss_space.list_nodes199 initial_node = oss_space.initial_node200 if len(list_nodes) == 0:201 print("No nodes")202 return ''203 list_nodes.remove(initial_node)204 if any(route for route in list_routes if route.finish_node == initial_node):205 place_holder_node = Node([],[])206 new_route = Route(place_holder_node, initial_node)207 list_routes.append(new_route)208 initial_node = place_holder_node209 210 final_node = Node([],[])211 for node in list_nodes:212 if node.final:213 new_route = Route(node, final_node)214 list_routes.append(new_route)215 while len(list_nodes) > 0:216 list_route_node_auto = []217 list_route_node_input = []218 list_route_node_output = []219 node = random.choice(list_nodes)220 #print(node.id)221 222 for route in list_routes:223 if route.start_node == node and route.finish_node == node:224 list_route_node_auto.append(route)225 if route.start_node == node and route.finish_node != node:226 list_route_node_output.append(route) 227 if route.finish_node == node and route.start_node != node:228 list_route_node_input.append(route)229 230 if len(list_route_node_output) > 1:231 232 groups = groupby(list_route_node_output, attrgetter('finish_node')) #o(n)233 for (key, data) in groups:234 routes = list(data)235 if len(routes) > 1: 236 new_labels_rel = set()237 for route in routes:238 if len(route.label_rel) > 2:239 if route.label_rel[0] == '(' and route.label_rel[-1] == ')' and route.label_rel[-2]!='*':240 route.label_rel = route.label_rel[1:-1]241 #print(route.label_rel)242 for label_t in route.set_label_rel:243 new_labels_rel.add(label_t)244 list_routes.remove(route)245 list_route_node_output.remove(route)246 247 new_label_rel = '|'.join(new_labels_rel)248 #print(new_label_rel)249 new_route = Route(node, key, label_rel = new_label_rel)250 new_route.set_label_rel = new_labels_rel251 list_routes.append(new_route)252 list_route_node_output.append(new_route)253 if len(list_route_node_input) == 1 and len(list_route_node_output) == 1 and len(list_route_node_auto) == 0:254 route_in = list_route_node_input[0]255 route_out = list_route_node_output[0]256 new_label, set_label = clear_label(route_in.label_rel, route_out.label_rel, set_label1=route_in.set_label_rel, set_label2=route_out.set_label_rel)257 258 new_route = Route(route_in.start_node, route_out.finish_node , label_rel = new_label)259 new_route.set_label_rel = set_label260 261 list_routes.append(new_route)262 263 list_routes.remove(route_in)264 list_routes.remove(route_out)265 list_route_node_input.remove(route_in)266 list_route_node_output.remove(route_out)267 268 269 if len(list_route_node_auto) > 0:270 for auto_route in list_route_node_auto:271 272 for input_route in list_route_node_input:273 for output_route in list_route_node_output:274 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, autolabel=auto_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel, set_auto=auto_route.set_label_rel)275 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label) 276 #print("entrato", auto_route.label_rel)277 new_route.set_label_rel = set_label278 #print("uscito", new_route.label_rel)279 list_routes.append(new_route)280 list_routes.remove(input_route) 281 list_routes.remove(auto_route)282 for output_route in list_route_node_output:283 list_routes.remove(output_route)284 else:285 for input_route in list_route_node_input:286 for output_route in list_route_node_output:287 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel)288 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label)289 new_route.set_label_rel = set_label290 list_routes.append(new_route)291 list_routes.remove(input_route)292 for output_route in list_route_node_output:293 list_routes.remove(output_route)294 list_nodes.remove(node)295 '''for route in list_routes:296 print("ROUTE",route.start_node.id, route.finish_node.id)297 '''298 final_labels = set()299 for route in list_routes:300 #new_labels_rel = set()301 if len(route.label_rel) > 2:302 if route.label_rel[0] == '(' and route.label_rel[-1] == ')' and route.label_rel[-2]!='*':303 route.label_rel = route.label_rel[1:-1]304 305 if len(route.set_label_rel) > 1:306 list_label = route.label_rel.split("|")307 for label in list_label:308 if label[0] == '(' and label[-1] == ')' and label[-2]!='*':309 label = label[1:-1]310 final_labels.add(label)311 '''for label in route.set_label_rel:312 final_labels.add(label)'''313 else:314 final_labels.add(route.label_rel)315 final_label = '|'.join(final_labels)316 #new_route = Route(node, key, label_rel = final_label)317 318 return final_label #return regular expression319def reg_expr_closing(closing_space):320 321 list_routes = closing_space.list_routes322 list_nodes = [node.node for node in closing_space.list_nodes]323 initial_node = closing_space.initial_node324 list_output_routes = closing_space.list_output_routes325 list_final_output_nodes = []326 list_output_nodes = []327 #list_final_routes = []328 list_nodes_not_final = []329 ex_initial_node_id = -2330 if len(list_nodes) == 1:331 new_route = Route(initial_node.node, initial_node.node)332 new_route.label_rel = '\u03b5'333 new_route.rif_node = initial_node.node 334 return [new_route] 335 336 if any(route for route in list_routes if route.finish_node.id == initial_node.node.id):337 338 place_holder_node = Node([],[])339 new_route = Route(place_holder_node, initial_node.node)340 list_routes.append(new_route)341 list_nodes_not_final.append(place_holder_node)342 ex_initial_node_id = initial_node.node.id343 initial_node.node = place_holder_node344 place_holder_node.id = -1345 346 final_node = Node([],[])347 final_node.id = -99348 349 for node in list_nodes:350 if node.final:351 list_final_output_nodes.append(node)352 new_route = Route(node, final_node)353 list_routes.append(new_route)354 else:355 list_nodes_not_final.append(node)356 for route in list_output_routes:357 if not route.start_node.final:358 if not route.start_node in list_output_nodes:359 list_output_nodes.append(route.start_node)360 list_final_output_nodes.append(route.start_node)361 new_route = Route(route.start_node, final_node)362 list_routes.append(new_route)363 list_nodes_not_final.remove(route.start_node)364 number_expr_nodes = len(list_final_output_nodes)365 366 if initial_node.node in list_nodes_not_final:367 list_nodes_not_final.remove(initial_node.node)368 while len(list_nodes_not_final) > 0:369 list_route_node_auto = []370 list_route_node_input = []371 list_route_node_output = []372 node = random.choice(list_nodes_not_final)373 374 for route in list_routes:375 if route.start_node == node and route.finish_node == node:376 list_route_node_auto.append(route)377 if route.start_node == node and route.finish_node != node:378 list_route_node_output.append(route) 379 if route.finish_node == node and route.start_node != node:380 list_route_node_input.append(route)381 382 if len(list_route_node_output) > 1: 383 groups = groupby(list_route_node_output, attrgetter('finish_node')) #o(n)384 for (key, data) in groups:385 routes = list(data)386 if len(routes) > 1:387 new_labels_rel = set()388 for route in routes:389 if len(route.label_rel) > 2:390 if route.label_rel[0] == '(' and route.label_rel[-1] == ')':391 route.label_rel = route.label_rel[1:-1]392 for label_t in route.set_label_rel:393 new_labels_rel.add(label_t)394 list_routes.remove(route)395 list_route_node_output.remove(route)396 397 new_label_rel = '|'.join(new_labels_rel)398 if len(new_labels_rel) > 1:399 new_route = Route(node, key, label_rel = new_label_rel)400 else:401 new_route = Route(node, key, label_rel = new_label_rel)402 #print(new_route.label_rel)403 new_route.set_label_rel = new_labels_rel404 405 list_routes.append(new_route)406 list_route_node_output.append(new_route)407 if len(list_route_node_input) == 1 and len(list_route_node_output) == 1 and len(list_route_node_auto) == 0:408 route_in = list_route_node_input[0]409 route_out = list_route_node_output[0]410 new_label, set_label = clear_label(route_in.label_rel, route_out.label_rel, set_label1=route_in.set_label_rel, set_label2=route_out.set_label_rel)411 412 new_route = Route(route_in.start_node, route_out.finish_node , label_rel = new_label)413 new_route.set_label_rel = set_label414 415 list_routes.append(new_route)416 list_routes.remove(route_in)417 list_routes.remove(route_out)418 list_route_node_input.remove(route_in)419 list_route_node_output.remove(route_out)420 421 422 if len(list_route_node_auto) > 0:423 for auto_route in list_route_node_auto:424 for input_route in list_route_node_input:425 for output_route in list_route_node_output:426 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, autolabel=auto_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel, set_auto=auto_route.set_label_rel)427 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label) 428 429 new_route.set_label_rel = set_label430 list_routes.append(new_route)431 list_routes.remove(input_route) 432 list_routes.remove(auto_route)433 434 for output_route in list_route_node_output:435 list_routes.remove(output_route)436 else:437 for input_route in list_route_node_input:438 for output_route in list_route_node_output:439 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel)440 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label)441 new_route.set_label_rel = set_label442 list_routes.append(new_route)443 list_routes.remove(input_route)444 for output_route in list_route_node_output:445 list_routes.remove(output_route)446 list_nodes_not_final.remove(node)447 448 #### QUA####449 while len(list_final_output_nodes) > 0:450 list_route_node_auto = []451 list_route_node_input = []452 list_route_node_output = []453 node = random.choice(list_final_output_nodes)454 #print("node id", node.id)455 '''for route in list_routes:456 print("route", route.start_node.id, route.finish_node.id, route.label_rel)457 '''458 for route in list_routes:459 if route.start_node == node and route.finish_node == node:460 list_route_node_auto.append(route)461 if route.start_node == node and route.finish_node != node:462 list_route_node_output.append(route) 463 if route.finish_node == node and route.start_node != node:464 list_route_node_input.append(route)465 466 if len(list_route_node_input) == 1 and len(list_route_node_output) == 1 and len(list_route_node_auto) == 0:467 route_in = list_route_node_input[0]468 route_out = list_route_node_output[0]469 new_label, set_label = clear_label(route_in.label_rel, route_out.label_rel, set_label1=route_in.set_label_rel, set_label2=route_out.set_label_rel)470 471 new_route = Route(route_in.start_node, route_out.finish_node , label_rel = new_label)472 new_route.set_label_rel = set_label473 474 if route_out.rif_node != None:475 new_route.rif_node = route_out.rif_node476 elif route_out.finish_node == final_node:477 new_route.rif_node = node478 479 list_routes.append(new_route)480 481 list_routes.remove(route_in)482 list_routes.remove(route_out)483 list_route_node_input.remove(route_in)484 list_route_node_output.remove(route_out)485 486 if len(list_route_node_auto) > 0:487 remove = False488 for auto_route in list_route_node_auto:489 for input_route in list_route_node_input:490 remove = True491 for output_route in list_route_node_output:492 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, autolabel=auto_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel, set_auto=auto_route.set_label_rel)493 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label) 494 495 new_route.set_label_rel = set_label496 list_routes.append(new_route)497 if output_route.rif_node != None:498 new_route.rif_node = output_route.rif_node499 elif output_route.finish_node == final_node:500 new_route.rif_node = node501 list_routes.remove(input_route) 502 list_routes.remove(auto_route)503 504 if remove:505 for output_route in list_route_node_output:506 list_routes.remove(output_route)507 if len(list_route_node_input) == 0:508 for output_route in list_route_node_output: 509 if output_route.rif_node != None:510 output_route.rif_node = output_route.rif_node511 elif output_route.finish_node == final_node:512 output_route.rif_node = node 513 else:514 remove = False515 for input_route in list_route_node_input:516 remove = True517 #print("qua", input_route.label_rel)518 for output_route in list_route_node_output:519 if input_route.start_node.id == -1 and output_route.finish_node.id == -99:520 #print("rimosso")521 continue522 new_label, set_label = clear_label(input_route.label_rel, output_route.label_rel, set_label1=input_route.set_label_rel, set_label2=output_route.set_label_rel)523 new_route = Route(input_route.start_node, output_route.finish_node, label_rel = new_label)524 new_route.set_label_rel = set_label525 #print("nuova rotta", new_route.label_rel, new_route.start_node.id, new_route.finish_node.id)526 list_routes.append(new_route)527 528 if output_route.rif_node != None:529 new_route.rif_node = output_route.rif_node530 elif output_route.finish_node == final_node:531 new_route.rif_node = node532 list_routes.remove(input_route)533 if remove:534 for output_route in list_route_node_output:535 list_routes.remove(output_route)536 if len(list_route_node_input) == 0:537 for output_route in list_route_node_output: 538 if output_route.rif_node != None:539 output_route.rif_node = output_route.rif_node540 elif output_route.finish_node == final_node:541 output_route.rif_node = node542 list_final_output_nodes.remove(node)543 '''for route in list_routes:544 print(closing_space.id, route.label_rel, route.rif_node.id)'''545 already_done = []546 routes = []547 for route in list_routes:548 if not route.rif_node.id in already_done:549 labels = set()550 gen = (r for r in list_routes if r.rif_node.id == route.rif_node.id)551 for r in gen:552 labels.add(r.label_rel)553 554 route.label_rel = "|".join(labels)555 route.set_label_rel = labels556 routes.append(route)557 already_done.append(route.rif_node.id)558 return routes559def clear_label(label1, label2, autolabel = None, set_label1 = None, set_label2 = None, set_auto = None):560 new_label = ''561 new_label_set = {new_label}562 #print("clear_label", label1, label2, set_label1, set_label2, autolabel, set_auto)563 if autolabel and autolabel != '\u03b5':564 autolabel = "(" + autolabel + ")"565 autolabel, set_auto = clear_label(autolabel, '*', set_label1={autolabel}, set_label2={'*'})566 if label1 == '\u03b5' and label2 == '\u03b5':567 if autolabel:568 if autolabel == '\u03b5':569 new_label = '\u03b5'570 else:571 new_label = autolabel572 new_label_set = set_auto573 else:574 new_label = '\u03b5'575 new_label_set = {new_label}576 elif label1 == '\u03b5':577 if autolabel:578 if autolabel == '\u03b5':579 new_label = label2580 new_label_set = set_label2581 else:582 new_label, new_label_set = clear_label(autolabel, label2, set_label1=set_auto, set_label2=set_label2)583 else:584 new_label = label2585 new_label_set = set_label2586 elif label2 == '\u03b5':587 if autolabel:588 if autolabel == '\u03b5':589 new_label = label1590 new_label_set = set_label1591 else:592 new_label, new_label_set = clear_label(label1, autolabel, set_label1=set_label1, set_label2=set_auto)593 else:594 new_label = label1595 new_label_set = set_label1596 else:597 if autolabel:598 if autolabel == '\u03b5':599 new_label, new_label_set = clear_label(label1, label2, set_label1=set_label1, set_label2=set_label2)600 else:601 new_label, new_label_set = clear_label(label1, autolabel, set_label1=set_label1, set_label2=set_auto)602 new_label, new_label_set = clear_label(new_label, label2, set_label1=new_label_set, set_label2=set_label2)603 elif set_label1:604 labels = set()605 #new_label_set = set()606 for label_temp in set_label1:607 for label_temp2 in set_label2:608 new_label_temp, new_label_set_temp = clear_label(label_temp, label_temp2)609 labels.add(new_label_temp)610 611 new_label = '|'.join(labels)612 new_label_set = labels613 614 else:615 616 new_label = label1 + label2617 new_label_set = {label1, label2}618 619 #print("sono qua",new_label, new_label_set )620 return new_label, new_label_set621def linear_diagnostic(diagnosticator_space, osservation):622 X = []623 X.append((diagnosticator_space.initial_state.id,'\u03b5'))624 X_final = []625 X_final += X626 list_states = diagnosticator_space.list_states627 for o in osservation:628 Xnew = []629 #print("osservazione", o)630 for (state_id, p) in X:631 groups = groupby(list_states[state_id].list_routes, attrgetter('finish_state')) #o(n)632 #print(state_id)633 #print("STATO", state_id, list_states[state_id].delta)634 for (key, data) in groups:635 #print(key.id)636 routes = list(data)637 for route in routes:638 if route.label_oss == o:639 p_label_set = set(p.split("|"))640 route_label_set = set(route.label_rel.split("|"))641 new_p, temp = clear_label(p, route.label_rel, set_label1 = p_label_set, set_label2 = route_label_set)642 #print("\t", new_p, p, route.label_rel, route.set_label_rel, )643 add = True644 for e_tuple_temp in Xnew:645 if e_tuple_temp[0] == route.finish_state.id and sorted(e_tuple_temp[1]) != sorted(new_p):646 #c'è un errore647 lst = list(e_tuple_temp)648 add = False649 label_set = set()650 label_set.add(e_tuple_temp[1])651 label_set.add(new_p)652 lst[1] = '|'.join(label_set)653 e_tuple_temp = tuple(lst)654 #print("\tdio", lst[1])655 break656 if add:657 #print("stato aggiunto", route.finish_state.id)658 Xnew.append((route.finish_state.id, new_p))659 #print("\t\t", add)660 X = copy.deepcopy(Xnew)661 X_final += Xnew662 #print(X_final)663 final_diagnosis = []664 labels_to_return = set()665 for e in Xnew:666 state = list_states[e[0]]667 if state.delta != '':668 list_delta = set(state.delta.split('|'))669 p_label_set = set(e[1].split("|"))670 p = "|".join(p_label_set)671 delta = "|".join(list_delta)672 label, temp = clear_label(p, delta, set_label1=p_label_set, set_label2=list_delta)673 labels_to_return.add(frozenset(temp))674 for frozen in labels_to_return:675 final_diagnosis.append('|'.join(list(frozen)))676 #print(final_diagnosis)677 return final_diagnosis678 ...

Full Screen

Full Screen

test_v_s_route.py

Source:test_v_s_route.py Github

copy

Full Screen

...135 assert_event_and_count(vsr_1_event_text, initial_count_vsr_1 + 4, events_ns_1)136 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 4, events_ns_1)137 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 4, events_ns_2)138 print("\nStep 6: remove VSRoute and check")139 delete_v_s_route(kube_apis.custom_objects, v_s_route_setup.route_m.name, v_s_route_setup.namespace)140 wait_before_test(1)141 new_config = get_vs_nginx_template_conf(kube_apis.v1,142 v_s_route_setup.namespace,143 v_s_route_setup.vs_name,144 ic_pod_name,145 ingress_controller_prerequisites.namespace)146 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",147 headers={"host": v_s_route_setup.vs_host})148 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",149 headers={"host": v_s_route_setup.vs_host})150 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",151 headers={"host": v_s_route_setup.vs_host})152 assert resp_1.status_code == 404153 assert resp_2.status_code == 404154 assert resp_3.status_code == 200155 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)156 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)157 assert_locations_not_in_config(new_config, v_s_route_setup.route_m.paths)158 assert_event_and_count(vsr_1_event_text, initial_count_vsr_1 + 4, events_ns_1)159 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 4, events_ns_1)160 # a warning event because the VS references a non-existing VSR161 assert_event_with_full_equality_and_count(vs_warning_event_text, 1, events_ns_1)162 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 5, events_ns_2)163 print("\nStep 7: restore VSRoute and check")164 create_v_s_route_from_yaml(kube_apis.custom_objects,165 f"{TEST_DATA}/virtual-server-route/route-multiple.yaml",166 v_s_route_setup.namespace)167 wait_before_test(1)168 new_config = get_vs_nginx_template_conf(kube_apis.v1,169 v_s_route_setup.namespace,170 v_s_route_setup.vs_name,171 ic_pod_name,172 ingress_controller_prerequisites.namespace)173 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",174 headers={"host": v_s_route_setup.vs_host})175 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",176 headers={"host": v_s_route_setup.vs_host})177 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",178 headers={"host": v_s_route_setup.vs_host})179 assert_responses_and_server_name(resp_1, resp_2, resp_3)180 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)181 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)182 assert_locations_in_config(new_config, v_s_route_setup.route_m.paths)183 assert_event_and_count(vsr_1_event_text, 1, events_ns_1)184 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 5, events_ns_1)185 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 6, events_ns_2)186 print("\nStep 8: remove one backend service and check")187 delete_service(kube_apis.v1, "backend1-svc", v_s_route_setup.route_m.namespace)188 wait_before_test(1)189 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",190 headers={"host": v_s_route_setup.vs_host})191 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",192 headers={"host": v_s_route_setup.vs_host})193 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",194 headers={"host": v_s_route_setup.vs_host})195 assert resp_1.status_code == 502196 assert resp_2.status_code == 200197 assert resp_3.status_code == 200198 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)199 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)200 assert_event_and_count(vsr_1_event_text, 2, events_ns_1)201 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 6, events_ns_1)202 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 7, events_ns_2)203 print("\nStep 9: restore backend service and check")204 create_service_with_name(kube_apis.v1, v_s_route_setup.route_m.namespace, "backend1-svc")205 wait_before_test(1)206 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",207 headers={"host": v_s_route_setup.vs_host})208 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",209 headers={"host": v_s_route_setup.vs_host})210 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",211 headers={"host": v_s_route_setup.vs_host})212 assert_responses_and_server_name(resp_1, resp_2, resp_3)213 events_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)214 events_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)215 assert_event_and_count(vsr_1_event_text, 3, events_ns_1)216 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 7, events_ns_1)217 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 8, events_ns_2)218 print("\nStep 10: remove VS and check")219 delete_virtual_server(kube_apis.custom_objects, v_s_route_setup.vs_name, v_s_route_setup.namespace)220 wait_before_test(1)221 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",222 headers={"host": v_s_route_setup.vs_host})223 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",224 headers={"host": v_s_route_setup.vs_host})225 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",226 headers={"host": v_s_route_setup.vs_host})227 assert resp_1.status_code == 404228 assert resp_2.status_code == 404229 assert resp_3.status_code == 404230 list0_list_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)231 list0_list_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)232 assert_event_and_count(vsr_1_event_text, 3, list0_list_ns_1)233 assert_event_with_full_equality_and_count(vs_event_text, initial_count_vs + 7, list0_list_ns_1)234 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 8, list0_list_ns_2)235 print("\nStep 11: restore VS and check")236 create_virtual_server_from_yaml(kube_apis.custom_objects,237 f"{TEST_DATA}/virtual-server-route/standard/virtual-server.yaml",238 v_s_route_setup.namespace)239 wait_before_test(1)240 resp_1 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[0]}",241 headers={"host": v_s_route_setup.vs_host})242 resp_2 = requests.get(f"{req_url}{v_s_route_setup.route_m.paths[1]}",243 headers={"host": v_s_route_setup.vs_host})244 resp_3 = requests.get(f"{req_url}{v_s_route_setup.route_s.paths[0]}",245 headers={"host": v_s_route_setup.vs_host})246 assert_responses_and_server_name(resp_1, resp_2, resp_3)247 list1_list_ns_1 = get_events(kube_apis.v1, v_s_route_setup.route_m.namespace)248 list1_list_ns_2 = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)249 assert_event_and_count(vsr_1_event_text, 4, list1_list_ns_1)250 assert_event_with_full_equality_and_count(vs_event_text, 1, list1_list_ns_1)251 assert_event_and_count(vsr_2_event_text, initial_count_vsr_2 + 9, list1_list_ns_2)252@pytest.mark.vsr253@pytest.mark.parametrize('crd_ingress_controller, v_s_route_setup',254 [({"type": "complete", "extra_args": [f"-enable-custom-resources"]},255 {"example": "virtual-server-route"})],256 indirect=True)257class TestVirtualServerRouteValidation:258 def test_vsr_without_vs(self, kube_apis,259 ingress_controller_prerequisites,260 crd_ingress_controller,261 v_s_route_setup,262 test_namespace):263 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)264 vsr_name = create_v_s_route_from_yaml(kube_apis.custom_objects,265 f"{TEST_DATA}/virtual-server-route/route-orphan.yaml",266 test_namespace)267 vsr_paths = get_paths_from_vsr_yaml(f"{TEST_DATA}/virtual-server-route/route-orphan.yaml")268 wait_before_test(1)269 new_config = get_vs_nginx_template_conf(kube_apis.v1,270 v_s_route_setup.namespace,271 v_s_route_setup.vs_name,272 ic_pod_name,273 ingress_controller_prerequisites.namespace)274 new_list_ns_3 = get_events(kube_apis.v1, test_namespace)275 assert_locations_not_in_config(new_config, vsr_paths)276 assert_event_and_count(f"VirtualServer {v_s_route_setup.namespace}/{v_s_route_setup.vs_name} ignores VirtualServerRoute", 1, new_list_ns_3)277 @pytest.mark.parametrize("route_yaml", [f"{TEST_DATA}/virtual-server-route/route-single-invalid-host.yaml",278 f"{TEST_DATA}/virtual-server-route/route-single-duplicate-path.yaml"])279 def test_make_existing_vsr_invalid(self, kube_apis,280 ingress_controller_prerequisites,281 crd_ingress_controller,282 v_s_route_setup,283 route_yaml):284 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)285 patch_v_s_route_from_yaml(kube_apis.custom_objects,286 v_s_route_setup.route_s.name,287 route_yaml,288 v_s_route_setup.route_s.namespace)289 wait_before_test(1)290 new_config = get_vs_nginx_template_conf(kube_apis.v1,291 v_s_route_setup.namespace,292 v_s_route_setup.vs_name,293 ic_pod_name,294 ingress_controller_prerequisites.namespace)295 new_vs_events = get_events(kube_apis.v1, v_s_route_setup.namespace)296 new_vsr_events = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)297 assert_locations_not_in_config(new_config, v_s_route_setup.route_s.paths)298 text = f"{v_s_route_setup.route_s.namespace}/{v_s_route_setup.route_s.name}"299 assert_event_and_count(f"Configuration for {v_s_route_setup.namespace}/{v_s_route_setup.vs_name} was added or updated with warning(s)",300 1,301 new_vs_events)302 if route_yaml == f"{TEST_DATA}/virtual-server-route/route-single-invalid-host.yaml":303 assert_event_and_count(f"VirtualServer is invalid or doesn't exist",304 1,305 new_vsr_events)306 else:307 assert_event_and_count(f"VirtualServerRoute {text} was rejected with error",308 1,309 new_vsr_events)310 def test_openapi_validation_flow(self, kube_apis, ingress_controller_prerequisites,311 crd_ingress_controller, v_s_route_setup):312 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)313 config_old = get_vs_nginx_template_conf(kube_apis.v1,314 v_s_route_setup.namespace,315 v_s_route_setup.vs_name,316 ic_pod_name,317 ingress_controller_prerequisites.namespace)318 route_yaml = f"{TEST_DATA}/virtual-server-route/route-single-invalid-openapi.yaml"319 try:320 patch_v_s_route_from_yaml(kube_apis.custom_objects,321 v_s_route_setup.route_s.name,322 route_yaml,323 v_s_route_setup.route_s.namespace)324 except ApiException as ex:325 assert ex.status == 422 and "spec.subroutes.action.pass" in ex.body326 except Exception as ex:327 pytest.fail(f"An unexpected exception is raised: {ex}")328 else:329 pytest.fail("Expected an exception but there was none")330 wait_before_test(1)331 config_new = get_vs_nginx_template_conf(kube_apis.v1,332 v_s_route_setup.namespace,333 v_s_route_setup.vs_name,334 ic_pod_name,335 ingress_controller_prerequisites.namespace)336 assert config_old == config_new, "Expected: config doesn't change"337@pytest.mark.vsr338@pytest.mark.parametrize('crd_ingress_controller, v_s_route_setup',339 [({"type": "complete", "extra_args": [f"-enable-custom-resources"]},340 {"example": "virtual-server-route"})],341 indirect=True)342class TestCreateInvalidVirtualServerRoute:343 def test_create_invalid_vsr(self, kube_apis,344 ingress_controller_prerequisites,345 crd_ingress_controller,346 v_s_route_setup):347 route_yaml = f"{TEST_DATA}/virtual-server-route/route-single-duplicate-path.yaml"348 ic_pod_name = get_first_pod_name(kube_apis.v1, ingress_controller_prerequisites.namespace)349 text = f"{v_s_route_setup.route_s.namespace}/{v_s_route_setup.route_s.name}"350 vs_event_text = f"Configuration for {v_s_route_setup.namespace}/{v_s_route_setup.vs_name} was added or updated with warning(s)"351 vsr_event_text = f"VirtualServerRoute {text} was rejected with error: spec.subroutes[1].path: Duplicate value: \"/backend2\""352 delete_v_s_route(kube_apis.custom_objects,353 v_s_route_setup.route_s.name,354 v_s_route_setup.route_s.namespace)355 create_v_s_route_from_yaml(kube_apis.custom_objects,356 route_yaml,357 v_s_route_setup.route_s.namespace)358 wait_before_test(1)359 new_config = get_vs_nginx_template_conf(kube_apis.v1,360 v_s_route_setup.namespace,361 v_s_route_setup.vs_name,362 ic_pod_name,363 ingress_controller_prerequisites.namespace)364 new_vs_events = get_events(kube_apis.v1, v_s_route_setup.namespace)365 new_vsr_events = get_events(kube_apis.v1, v_s_route_setup.route_s.namespace)366 assert_locations_not_in_config(new_config, v_s_route_setup.route_s.paths)...

Full Screen

Full Screen

test_bgpspeaker.py

Source:test_bgpspeaker.py Github

copy

Full Screen

...287 @mock.patch(288 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',289 mock.MagicMock(return_value=None))290 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')291 def test_evpn_prefix_add_ip_prefix_route(self, mock_call):292 # Prepare test data293 route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE294 route_dist = '65000:100'295 esi = 0 # denotes single-homed296 ethernet_tag_id = 200297 ip_prefix = '192.168.0.0/24'298 gw_ip_addr = '172.16.0.1'299 next_hop = '0.0.0.0'300 expected_kwargs = {301 'route_type': route_type,302 'route_dist': route_dist,303 'esi': esi,304 'ethernet_tag_id': ethernet_tag_id,305 'ip_prefix': ip_prefix,306 'gw_ip_addr': gw_ip_addr,307 'next_hop': next_hop,308 }309 # Test310 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')311 speaker.evpn_prefix_add(312 route_type=route_type,313 route_dist=route_dist,314 esi=esi,315 ethernet_tag_id=ethernet_tag_id,316 ip_prefix=ip_prefix,317 gw_ip_addr=gw_ip_addr,318 )319 # Check320 mock_call.assert_called_with(321 'evpn_prefix.add_local', **expected_kwargs)322 @mock.patch(323 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',324 mock.MagicMock(return_value=None))325 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')326 def test_evpn_prefix_add_ip_prefix_route_vni(self, mock_call):327 # Prepare test data328 route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE329 route_dist = '65000:100'330 esi = 0 # denotes single-homed331 ethernet_tag_id = 200332 ip_prefix = '192.168.0.0/24'333 gw_ip_addr = '172.16.0.1'334 vni = 500335 tunnel_type = bgpspeaker.TUNNEL_TYPE_VXLAN336 next_hop = '0.0.0.0'337 expected_kwargs = {338 'route_type': route_type,339 'route_dist': route_dist,340 'esi': esi,341 'ethernet_tag_id': ethernet_tag_id,342 'ip_prefix': ip_prefix,343 'gw_ip_addr': gw_ip_addr,344 'tunnel_type': tunnel_type,345 'vni': vni,346 'next_hop': next_hop,347 }348 # Test349 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')350 speaker.evpn_prefix_add(351 route_type=route_type,352 route_dist=route_dist,353 esi=esi,354 ethernet_tag_id=ethernet_tag_id,355 ip_prefix=ip_prefix,356 gw_ip_addr=gw_ip_addr,357 tunnel_type=tunnel_type,358 vni=vni,359 )360 # Check361 mock_call.assert_called_with(362 'evpn_prefix.add_local', **expected_kwargs)363 @raises(ValueError)364 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',365 mock.MagicMock(return_value=None))366 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')367 def test_evpn_prefix_add_invalid_route_type(self, mock_call):368 # Prepare test data369 route_type = 'foobar' # Invalid EVPN route type370 route_dist = '65000:100'371 esi = 0 # denotes single-homed372 ethernet_tag_id = 200373 mac_addr = 'aa:bb:cc:dd:ee:ff'374 ip_addr = '192.168.0.1'375 next_hop = '10.0.0.1'376 # Test377 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')378 speaker.evpn_prefix_add(379 route_type=route_type,380 route_dist=route_dist,381 esi=esi,382 ethernet_tag_id=ethernet_tag_id,383 mac_addr=mac_addr,384 ip_addr=ip_addr,385 next_hop=next_hop,386 )387 # Check388 mock_call.assert_called_with(389 'evpn_prefix.add_local', 'Invalid arguments detected')390 @mock.patch(391 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',392 mock.MagicMock(return_value=None))393 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')394 def test_evpn_prefix_del_auto_discovery(self, mock_call):395 # Prepare test data396 route_type = bgpspeaker.EVPN_ETH_AUTO_DISCOVERY397 route_dist = '65000:100'398 esi = {399 'type': ESI_TYPE_LACP,400 'mac_addr': 'aa:bb:cc:dd:ee:ff',401 'port_key': 100,402 }403 ethernet_tag_id = EVPN_MAX_ET404 expected_kwargs = {405 'route_type': route_type,406 'route_dist': route_dist,407 'esi': esi,408 'ethernet_tag_id': ethernet_tag_id,409 }410 # Test411 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')412 speaker.evpn_prefix_del(413 route_type=route_type,414 route_dist=route_dist,415 esi=esi,416 ethernet_tag_id=ethernet_tag_id,417 )418 # Check419 mock_call.assert_called_with(420 'evpn_prefix.delete_local', **expected_kwargs)421 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',422 mock.MagicMock(return_value=None))423 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')424 def test_evpn_prefix_del_mac_ip_adv(self, mock_call):425 # Prepare test data426 route_type = bgpspeaker.EVPN_MAC_IP_ADV_ROUTE427 route_dist = '65000:100'428 ethernet_tag_id = 200429 mac_addr = 'aa:bb:cc:dd:ee:ff'430 ip_addr = '192.168.0.1'431 expected_kwargs = {432 'route_type': route_type,433 'route_dist': route_dist,434 'ethernet_tag_id': ethernet_tag_id,435 'mac_addr': mac_addr,436 'ip_addr': ip_addr,437 }438 # Test439 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')440 speaker.evpn_prefix_del(441 route_type=route_type,442 route_dist=route_dist,443 ethernet_tag_id=ethernet_tag_id,444 mac_addr=mac_addr,445 ip_addr=ip_addr,446 )447 # Check448 mock_call.assert_called_with(449 'evpn_prefix.delete_local', **expected_kwargs)450 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',451 mock.MagicMock(return_value=None))452 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')453 def test_evpn_prefix_del_multicast_etag(self, mock_call):454 # Prepare test data455 route_type = bgpspeaker.EVPN_MULTICAST_ETAG_ROUTE456 route_dist = '65000:100'457 esi = 0 # denotes single-homed458 ethernet_tag_id = 200459 mac_addr = 'aa:bb:cc:dd:ee:ff'460 ip_addr = '192.168.0.1'461 expected_kwargs = {462 'route_type': route_type,463 'route_dist': route_dist,464 # 'esi': esi, # should be ignored465 'ethernet_tag_id': ethernet_tag_id,466 # 'mac_addr': mac_addr, # should be ignored467 'ip_addr': ip_addr,468 }469 # Test470 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')471 speaker.evpn_prefix_del(472 route_type=route_type,473 route_dist=route_dist,474 esi=esi,475 ethernet_tag_id=ethernet_tag_id,476 mac_addr=mac_addr,477 ip_addr=ip_addr,478 )479 # Check480 mock_call.assert_called_with(481 'evpn_prefix.delete_local', **expected_kwargs)482 @raises(ValueError)483 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',484 mock.MagicMock(return_value=None))485 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')486 def test_evpn_prefix_del_invalid_route_type(self, mock_call):487 # Prepare test data488 route_type = 'foobar' # Invalid EVPN route type489 route_dist = '65000:100'490 esi = 0 # denotes single-homed491 ethernet_tag_id = 200492 mac_addr = 'aa:bb:cc:dd:ee:ff'493 ip_addr = '192.168.0.1'494 # Test495 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')496 speaker.evpn_prefix_del(497 route_type=route_type,498 route_dist=route_dist,499 esi=esi,500 ethernet_tag_id=ethernet_tag_id,501 mac_addr=mac_addr,502 ip_addr=ip_addr,503 )504 # Check505 mock_call.assert_called_with(506 'evpn_prefix.delete_local', 'Invalid arguments detected')507 @mock.patch(508 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',509 mock.MagicMock(return_value=None))510 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')511 def test_evpn_prefix_del_eth_segment(self, mock_call):512 # Prepare test data513 route_type = bgpspeaker.EVPN_ETH_SEGMENT514 route_dist = '65000:100'515 esi = {516 'esi_type': ESI_TYPE_MAC_BASED,517 'mac_addr': 'aa:bb:cc:dd:ee:ff',518 'local_disc': 100,519 }520 ip_addr = '192.168.0.1'521 expected_kwargs = {522 'route_type': route_type,523 'route_dist': route_dist,524 'esi': esi,525 'ip_addr': ip_addr,526 }527 # Test528 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')529 speaker.evpn_prefix_del(530 route_type=route_type,531 route_dist=route_dist,532 esi=esi,533 ip_addr=ip_addr,534 )535 # Check536 mock_call.assert_called_with(537 'evpn_prefix.delete_local', **expected_kwargs)538 @mock.patch(539 'ryu.services.protocols.bgp.bgpspeaker.BGPSpeaker.__init__',540 mock.MagicMock(return_value=None))541 @mock.patch('ryu.services.protocols.bgp.bgpspeaker.call')542 def test_evpn_prefix_del_ip_prefix_route(self, mock_call):543 # Prepare test data544 route_type = bgpspeaker.EVPN_IP_PREFIX_ROUTE545 route_dist = '65000:100'546 ethernet_tag_id = 200547 ip_prefix = '192.168.0.0/24'548 expected_kwargs = {549 'route_type': route_type,550 'route_dist': route_dist,551 'ethernet_tag_id': ethernet_tag_id,552 'ip_prefix': ip_prefix,553 }554 # Test555 speaker = bgpspeaker.BGPSpeaker(65000, '10.0.0.1')556 speaker.evpn_prefix_del(...

Full Screen

Full Screen

OverlapGraph.py

Source:OverlapGraph.py Github

copy

Full Screen

1import os2import numpy as np3from tqdm import tqdm4import matplotlib5matplotlib.use('Agg')6import matplotlib.pyplot as plt7from PIL import Image8class OverlapGraph():9 # 指定の形式にトラッキング結果を整形して出力するクラス10 def __init__(self, data_list, route_list):11 # コンストラクタ12 # args:13 self.data_list = data_list14 self.route_list = route_list15 # maxrange : グラフの描画範囲(±[μm])16 self.maxrange = 317 # その他18 self.all_node_list = []19 self.all_node_id = []20 self.devided_route = []21 self.colors = []22 def setMaxRange(self, value):23 self.maxrange = value24 def route_extractor(self, threshold):25 # トラッキング結果から閾値のフレーム連続してトラッキングされている経路を抽出26 # 有効経路の抽出27 effective_route = [28 route for route in self.route_list29 if route.route_length > threshold30 ]31 # 全Nodeクラスの連続配列を作る32 self.all_node_list = [node for data in self.data_list for node in data]33 self.all_node_id = [node.ID for node in self.all_node_list]34 # 経路始点から閾値までのノードIDとTIMEのリストを作る35 self.devided_route = [route.route[:threshold] for route in effective_route]36 self.devided_route_time = [37 [t for t in range(route.t, route.t+10)]38 for route in effective_route39 ]40 # ランダムに色を有効経路の個数分生成(値0~255の範囲でn行3列のランダムなndarrayを生成)41 self.colors = np.random.rand(len(effective_route), 3)42 def figout(self, out_path, *, pix2m=1):43 # 抽出した全経路を重ねあわせて表示する.44 print('\nRoute overlapping graph making now ...')45 # 出力先作成46 print('> Check output dirctory: \"' + out_path + '\"')47 if os.path.isdir(out_path) is False:48 print('> Output directory are not found.')49 os.makedirs(out_path, exist_ok=True)50 print('> New directory created!')51 else:52 print('> Already exist.')53 print('> Calculating ...')54 # 描画用の座標リストを作る55 point_list = [56 [57 [self.all_node_list[ID].x, self.all_node_list[ID].y]58 for ID in route59 ] for route in self.devided_route60 ]61 # 始点の座標を(0, 0)にして全体を平行移動する62 moved_point_list = [63 [64 np.array(np.array(node) - np.array(route[0])).tolist()65 for node in route66 ] for route in point_list67 ]68 # 数値を実際の距離に合わせる69 real_scale_list = np.array(moved_point_list) * pix2m70 # グラフの生成71 fig = plt.figure(figsize=(5, 5))72 ax = fig.add_subplot(1, 1, 1)73 for r_idx, route in enumerate(real_scale_list):74 for i in range(1, len(route)):75 parent_x = route[i-1][0]76 parent_y = -route[i-1][1]77 child_x = route[i][0]78 child_y = -route[i][1]79 # 注意:画像の見え方と実際の値の齟齬を解消するために,y軸は符号反転する80 ax.plot(81 [parent_x, child_x], [parent_y, child_y],82 c=self.colors[r_idx].tolist())83 # 全経路の最大値(+余白)を求めて,グラフ枠のサイズを決める84 # max_range = np.absolute(real_scale_list).max() + pix2m85 # plt.xlim([-max_range, max_range])86 # plt.ylim([-max_range, max_range])87 # グラフ枠のサイズを3umに設定する88 w = self.maxrange89 plt.xlim([-w, w])90 plt.ylim([-w, w])91 # グラフの保存92 filename = out_path + '/overlapped_route.png'93 plt.savefig(filename, dpi=100, transparent=True)94 plt.close()95 print('> Done.')96 return moved_point_list, real_scale_list97 def trajout(self, out_path, *, pix2m=1):98 # 抽出した全経路を個別に並べて出力する.99 print('\nRoute trajectory graph making now ...')100 # 出力先作成101 out_path = out_path + 'traj_fig/'102 print('> Check output dirctory: \"'+ out_path + '\"')103 if os.path.isdir(out_path) is False:104 print('> Output directory are not found.')105 os.makedirs(out_path, exist_ok=True)106 print('> New directory created!')107 else:108 print('> Already exist.')109 print('> Calculating ...')110 # 描画用の座標リストを作る111 point_list = [112 [113 [self.all_node_list[ID].x, self.all_node_list[ID].y]114 for ID in route115 ] for route in self.devided_route116 ]117 # 始点の座標を(0, 0)にして全体を平行移動する118 moved_point_list = [119 [120 np.array(np.array(node) - np.array(route[0])).tolist()121 for node in route122 ] for route in point_list123 ]124 # 数値を実際の距離に合わせる125 real_scale_list = np.array(moved_point_list) * pix2m126 # 全経路の最大値(+余白)を求めて,グラフ枠のサイズを決める127 max_range = np.absolute(real_scale_list).max() + pix2m128 # プログレスバーの生成129 with tqdm(total=len(real_scale_list), ascii=True, ncols=100) as pbar:130 for r_idx, route in enumerate(real_scale_list):131 # プログレスバーを進める132 pbar.update(1)133 # グラフの生成134 fig = plt.figure(figsize=(2, 2))135 ax = fig.add_subplot(1, 1, 1)136 for i in range(1, len(route)):137 parent_x = route[i-1][0]138 parent_y = -route[i-1][1]139 child_x = route[i][0]140 child_y = -route[i][1]141 # 注意:画像の見え方と実際の値の齟齬を解消するために,y軸は符号反転する142 ax.plot(143 [parent_x, child_x], [parent_y, child_y],144 c=self.colors[r_idx].tolist())145 # グラフの描画範囲の設定146 plt.xlim([-max_range, max_range])147 plt.ylim([-max_range, max_range])148 # グラフ枠のサイズを3umに設定する149 # w = self.maxrange150 # plt.xlim([-w, w])151 # plt.ylim([-w, w])152 # グラフの保存153 filename = out_path + '/route_%05d.png' % r_idx154 plt.savefig(filename, dpi=100, transparent=True)155 plt.close()156 print('> Done.')157 def movout(self, out_path, bg_path):158 # トラッキング結果をオーバーレイ描画して出力する159 print('\nRoute trajectory movie making now ...')160 # 出力先作成161 out_path = out_path + 'mov_fig/'162 print('> Check output dirctory: \"' + out_path + '\"')163 if os.path.isdir(out_path) is False:164 print('> Output directory are not found.')165 os.makedirs(out_path, exist_ok=True)166 print('> New directory created!')167 else:168 print('> Already exist.')169 print('> Calculating ...')170 # プログレスバーの生成171 with tqdm(total=len(self.data_list), ascii=True, ncols=100) as pbar:172 # フレームごとにループ処理する173 for idx, nodes in enumerate(self.data_list):174 # プログレスバーを進める175 pbar.update(1)176 # 背景の読み込み177 img = Image.open(bg_path % idx)178 img = np.array(img)179 h = img.shape[0]180 w = img.shape[1]181 # グラフの生成182 fig = plt.figure(figsize=(w/10, h/10))183 ax = fig.add_subplot(1, 1, 1)184 # 画像のカラー/グレースケール判定185 if len(img.shape) is 2:186 ax.imshow(img, 'gray')187 print('> input image is gray scale.')188 else:189 ax.imshow(img)190 # print('> input image is RGB color')191 # 描画用の座標リストを作る192 for r_idx, (T, route) in enumerate(zip(self.devided_route_time, self.devided_route)):193 if idx in T:194 index = T.index(idx)195 for i in reversed(range(index+1)):196 ID = route[i]197 node = self.all_node_list[ID]198 if node.parent_ID is not None:199 parent = self.all_node_list[node.parent_ID]200 child_x = node.x201 child_y = node.y202 parent_x = parent.x203 parent_y = parent.y204 ax.plot(205 [parent_x, child_x], [parent_y, child_y],206 c=self.colors[r_idx].tolist()207 )208 # ノードの描画209 for i, node in enumerate(nodes):210 ax.scatter(node.x, node.y, c='red', s=200, marker='*', edgecolors='black', linewidths=1)211 # 範囲設定212 plt.xlim([0, w-1])213 plt.ylim([h-1, 0])214 filename = out_path + '%05d.tif' % idx215 plt.savefig(filename, dpi=100)216 plt.close()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful