Best Python code snippet using localstack_python
test_request_reply_v12.py
Source:test_request_reply_v12.py
...48 self.unclear -= 149 self.start_next_test(dp)50 else:51 self.print_results()52 def run_verify(self, ev):53 msg = ev.msg54 dp = msg.datapath55 verify_func = self.verify_default56 v = "verify" + self.current[4:]57 if v in dir(self):58 verify_func = getattr(self, v)59 result = verify_func(dp, msg)60 if result is True:61 self.unclear -= 162 self.results[self.current] = result63 self.start_next_test(dp)64 def verify_default(self, dp, msg):65 type_ = self._verify66 if msg.msg_type == dp.ofproto.OFPT_STATS_REPLY:67 return self.verify_stats(dp, msg.body, type_)68 elif msg.msg_type == type_:69 return True70 else:71 return 'Reply msg_type %s expected %s' \72 % (msg.msg_type, type_)73 def verify_stats(self, dp, stats, type_):74 stats_types = dp.ofproto_parser.OFPStatsReply._STATS_TYPES75 expect = stats_types.get(type_).__name__76 if isinstance(stats, list):77 for s in stats:78 if expect == s.__class__.__name__:79 return True80 else:81 if expect == stats.__class__.__name__:82 return True83 return 'Reply msg has not \'%s\' class.\n%s' % (expect, stats)84 def mod_flow(self, dp, cookie=0, cookie_mask=0, table_id=0,85 command=None, idle_timeout=0, hard_timeout=0,86 priority=0xff, buffer_id=0xffffffff, match=None,87 actions=None, inst_type=None, out_port=None,88 out_group=None, flags=0, inst=None):89 if command is None:90 command = dp.ofproto.OFPFC_ADD91 if inst is None:92 if inst_type is None:93 inst_type = dp.ofproto.OFPIT_APPLY_ACTIONS94 inst = []95 if actions is not None:96 inst = [dp.ofproto_parser.OFPInstructionActions(97 inst_type, actions)]98 if match is None:99 match = dp.ofproto_parser.OFPMatch()100 if out_port is None:101 out_port = dp.ofproto.OFPP_ANY102 if out_group is None:103 out_group = dp.ofproto.OFPG_ANY104 m = dp.ofproto_parser.OFPFlowMod(dp, cookie, cookie_mask,105 table_id, command,106 idle_timeout, hard_timeout,107 priority, buffer_id,108 out_port, out_group,109 flags, match, inst)110 dp.send_msg(m)111 def get_port(self, dp):112 for port_no, port in dp.ports.items():113 if port_no != dp.ofproto.OFPP_LOCAL:114 return port115 return None116 # Test for Reply message type117 def test_desc_stats_request(self, dp):118 self._verify = dp.ofproto.OFPST_DESC119 m = dp.ofproto_parser.OFPDescStatsRequest(dp)120 dp.send_msg(m)121 def test_flow_stats_request(self, dp):122 self._verify = dp.ofproto.OFPST_FLOW123 self.mod_flow(dp)124 self.send_flow_stats(dp)125 def test_aggregate_stats_request(self, dp):126 self._verify = dp.ofproto.OFPST_AGGREGATE127 match = dp.ofproto_parser.OFPMatch()128 m = dp.ofproto_parser.OFPAggregateStatsRequest(129 dp, dp.ofproto.OFPTT_ALL, dp.ofproto.OFPP_ANY,130 dp.ofproto.OFPG_ANY, 0, 0, match)131 dp.send_msg(m)132 def test_table_stats_request(self, dp):133 self._verify = dp.ofproto.OFPST_TABLE134 m = dp.ofproto_parser.OFPTableStatsRequest(dp)135 dp.send_msg(m)136 def test_port_stats_request(self, dp):137 self._verify = dp.ofproto.OFPST_PORT138 m = dp.ofproto_parser.OFPPortStatsRequest(dp, dp.ofproto.OFPP_ANY)139 dp.send_msg(m)140 def test_echo_request(self, dp):141 self._verify = dp.ofproto.OFPT_ECHO_REPLY142 m = dp.ofproto_parser.OFPEchoRequest(dp)143 dp.send_msg(m)144 def test_features_request(self, dp):145 self._verify = dp.ofproto.OFPT_FEATURES_REPLY146 m = dp.ofproto_parser.OFPFeaturesRequest(dp)147 dp.send_msg(m)148 def test_get_config_request(self, dp):149 self._verify = dp.ofproto.OFPT_GET_CONFIG_REPLY150 m = dp.ofproto_parser.OFPGetConfigRequest(dp)151 dp.send_msg(m)152 def test_barrier_request(self, dp):153 self._verify = dp.ofproto.OFPT_BARRIER_REPLY154 dp.send_barrier()155 def test_error_reply(self, dp):156 ports = [0]157 for p in dp.ports:158 if p != dp.ofproto.OFPP_LOCAL:159 ports.append(p)160 port_no = max(ports) + 1161 self._verify = dp.ofproto.OFPT_ERROR162 m = dp.ofproto_parser.OFPPortMod(163 dp, port_no, 'ff:ff:ff:ff:ff:ff', 0, 0, 0)164 dp.send_msg(m)165 # Test for reply value166 def test_flow_stats_none(self, dp):167 self.send_flow_stats(dp)168 def verify_flow_stats_none(self, dp, msg):169 stats = msg.body170 if len(stats):171 return 'Reply msg has body. %s' % (stats, )172 return True173 def test_flow_stats_reply_value(self, dp):174 self._verify = []175 c = 0176 while c < self.n_tables:177 # value = (talbe_id, cookie, idle_timeout, hard_timeout, priority)178 v = (c, c + 1, c + 2, c + 3, c + 4)179 self._verify.append(v)180 self.mod_flow(dp, table_id=v[0], cookie=v[1],181 idle_timeout=v[2], hard_timeout=v[3], priority=v[4])182 c += 1183 dp.send_barrier()184 self.send_flow_stats(dp)185 def verify_flow_stats_reply_value(self, dp, msg):186 c = 0187 for f in msg.body:188 f_value = (f.table_id, f.cookie, f.idle_timeout,189 f.hard_timeout, f.priority, )190 if f_value != self._verify[c]:191 return 'param is mismatched. verify=%s, reply=%s' \192 % (self._verify[c], f_value,)193 c += 1194 return len(msg.body) == self.n_tables195 def test_echo_request_has_data(self, dp):196 data = 'test'197 self._verify = data198 m = dp.ofproto_parser.OFPEchoRequest(dp)199 m.data = data200 dp.send_msg(m)201 def verify_echo_request_has_data(self, dp, msg):202 data = msg.data203 return self._verify == data204 def test_aggregate_stats_flow_count(self, dp):205 c = 0206 while c < self.n_tables:207 self.mod_flow(dp, table_id=c)208 c += 1209 dp.send_barrier()210 match = dp.ofproto_parser.OFPMatch()211 m = dp.ofproto_parser.OFPAggregateStatsRequest(212 dp, dp.ofproto.OFPTT_ALL, dp.ofproto.OFPP_ANY,213 dp.ofproto.OFPG_ANY, 0, 0, match)214 dp.send_msg(m)215 def verify_aggregate_stats_flow_count(self, dp, msg):216 stats = msg.body217 return stats.flow_count == self.n_tables218 def test_aggregate_stats_flow_count_out_port(self, dp):219 actions = [dp.ofproto_parser.OFPActionOutput(1, 1500)]220 self.mod_flow(dp, table_id=1, actions=actions)221 actions = [dp.ofproto_parser.OFPActionOutput(2, 1500)]222 self.mod_flow(dp, table_id=2, actions=actions)223 dp.send_barrier()224 out_port = 2225 match = dp.ofproto_parser.OFPMatch()226 m = dp.ofproto_parser.OFPAggregateStatsRequest(227 dp, dp.ofproto.OFPTT_ALL, out_port,228 dp.ofproto.OFPG_ANY, 0, 0, match)229 dp.send_msg(m)230 def verify_aggregate_stats_flow_count_out_port(self, dp, msg):231 stats = msg.body232 return stats.flow_count == 1233 def test_aggregate_stats_packet_count(self, dp):234 in_port = 1235 data = 'test'236 self._verify = {'packet_count': 1,237 'byte_count': len(data)}238 # add flow239 match = dp.ofproto_parser.OFPMatch()240 match.set_in_port(in_port)241 self.mod_flow(dp, table_id=0, match=match)242 # packet out243 output = dp.ofproto.OFPP_TABLE244 actions = [dp.ofproto_parser.OFPActionOutput(output, 0)]245 m = dp.ofproto_parser.OFPPacketOut(dp, 0xffffffff, in_port,246 actions, data)247 dp.send_msg(m)248 dp.send_barrier()249 match = dp.ofproto_parser.OFPMatch()250 m = dp.ofproto_parser.OFPAggregateStatsRequest(251 dp, dp.ofproto.OFPTT_ALL, dp.ofproto.OFPP_ANY,252 dp.ofproto.OFPG_ANY, 0, 0, match)253 dp.send_msg(m)254 def verify_aggregate_stats_packet_count(self, dp, msg):255 for name, val in self._verify.items():256 r_val = getattr(msg.body, name)257 if val != r_val:258 return '%s is mismatched. verify=%s, reply=%s' \259 % (name, val, r_val)260 return True261 def test_set_config_nomal(self, dp):262 flags = dp.ofproto.OFPC_FRAG_NORMAL263 self._verify = flags264 m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)265 dp.send_msg(m)266 dp.send_barrier()267 m = dp.ofproto_parser.OFPGetConfigRequest(dp)268 dp.send_msg(m)269 def verify_set_config_nomal(self, dp, msg):270 return self._verify == msg.flags271 def test_set_config_drop(self, dp):272 flags = dp.ofproto.OFPC_FRAG_DROP273 self._verify = flags274 m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)275 dp.send_msg(m)276 dp.send_barrier()277 m = dp.ofproto_parser.OFPGetConfigRequest(dp)278 dp.send_msg(m)279 def verify_set_config_drop(self, dp, msg):280 return self._verify == msg.flags281 def test_set_config_mask(self, dp):282 flags = dp.ofproto.OFPC_FRAG_MASK283 self._verify = flags284 m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)285 dp.send_msg(m)286 dp.send_barrier()287 m = dp.ofproto_parser.OFPGetConfigRequest(dp)288 dp.send_msg(m)289 def verify_set_config_mask(self, dp, msg):290 return self._verify == msg.flags291 def test_set_config_ttl_to_controller(self, dp):292 flags = dp.ofproto.OFPC_INVALID_TTL_TO_CONTROLLER293 self._verify = flags294 m = dp.ofproto_parser.OFPSetConfig(dp, flags, 0)295 dp.send_msg(m)296 dp.send_barrier()297 m = dp.ofproto_parser.OFPGetConfigRequest(dp)298 dp.send_msg(m)299 def verify_set_config_ttl_to_controller(self, dp, msg):300 return self._verify == msg.flags301 def test_set_config_miss_send_len(self, dp):302 flags = dp.ofproto.OFPC_FRAG_NORMAL303 ms_len = 256304 self._verify = ms_len305 m = dp.ofproto_parser.OFPSetConfig(dp, flags, ms_len)306 dp.send_msg(m)307 dp.send_barrier()308 m = dp.ofproto_parser.OFPGetConfigRequest(dp)309 dp.send_msg(m)310 def verify_set_config_miss_send_len(self, dp, msg):311 return self._verify == msg.miss_send_len312 def test_set_config_miss_send_len_max(self, dp):313 flags = dp.ofproto.OFPC_FRAG_NORMAL314 ms_len = dp.ofproto.OFPCML_MAX315 self._verify = ms_len316 m = dp.ofproto_parser.OFPSetConfig(dp, flags, ms_len)317 dp.send_msg(m)318 dp.send_barrier()319 m = dp.ofproto_parser.OFPGetConfigRequest(dp)320 dp.send_msg(m)321 def verify_set_config_miss_send_len_max(self, dp, msg):322 return self._verify == msg.miss_send_len323 def test_set_config_no_buffer(self, dp):324 flags = dp.ofproto.OFPC_FRAG_NORMAL325 ms_len = dp.ofproto.OFPCML_NO_BUFFER326 self._verify = ms_len327 m = dp.ofproto_parser.OFPSetConfig(dp, flags, ms_len)328 dp.send_msg(m)329 dp.send_barrier()330 m = dp.ofproto_parser.OFPGetConfigRequest(dp)331 dp.send_msg(m)332 def verify_set_config_no_buffer(self, dp, msg):333 return self._verify == msg.miss_send_len334 def _verify_flow_inst_type(self, dp, msg):335 inst_type = self._verify336 stats = msg.body337 for s in stats:338 for i in s.instructions:339 if i.type == inst_type:340 return True341 return 'not found inst_type[%s]' % (inst_type, )342 def test_flow_add_apply_actions(self, dp):343 inst_type = dp.ofproto.OFPIT_APPLY_ACTIONS344 self._verify = inst_type345 actions = [dp.ofproto_parser.OFPActionOutput(1, 1500)]346 self.mod_flow(dp, actions=actions, inst_type=inst_type)347 self.send_flow_stats(dp)348 def verify_flow_add_apply_actions(self, dp, msg):349 return self._verify_flow_inst_type(dp, msg)350 def test_flow_add_goto_table(self, dp):351 self._verify = dp.ofproto.OFPIT_GOTO_TABLE352 inst = [dp.ofproto_parser.OFPInstructionGotoTable(1), ]353 self.mod_flow(dp, inst=inst)354 self.send_flow_stats(dp)355 def verify_flow_add_goto_table(self, dp, msg):356 return self._verify_flow_inst_type(dp, msg)357 def _verify_flow_value(self, dp, msg):358 stats = msg.body359 verify = self._verify360 if len(verify) != len(stats):361 return 'flow_count is mismatched. verify=%s stats=%s' \362 % (len(verify), len(stats))363 for s in stats:364 v_port = -1365 v = verify.get(s.table_id, None)366 if v:367 v_port = v[3].port368 s_port = s.instructions[0].actions[0].port369 if v_port != s_port:370 return 'port is mismatched. table_id=%s verify=%s, stats=%s' \371 % (s.table_id, v_port, s_port)372 return True373 def _add_flow_for_flow_mod_tests(self, dp):374 a1 = dp.ofproto_parser.OFPActionOutput(1, 1500)375 a2 = dp.ofproto_parser.OFPActionOutput(2, 1500)376 # table_id, cookie, priority, dl_dst, action)377 tables = {0: [0xffff, 10, b'\xee' * 6, a1],378 1: [0xff00, 10, b'\xee' * 6, a2],379 2: [0xf000, 100, b'\xee' * 6, a1],380 3: [0x0000, 10, b'\xff' * 6, a1]}381 self._verify = tables382 for table_id, val in tables.items():383 match = dp.ofproto_parser.OFPMatch()384 match.set_dl_dst(val[2])385 self.mod_flow(dp, match=match, actions=[val[3]],386 table_id=table_id, cookie=val[0], priority=val[1])387 dp.send_barrier()388 def test_flow_mod_table_id(self, dp):389 self._add_flow_for_flow_mod_tests(dp)390 # modify flow of table_id=3391 action = dp.ofproto_parser.OFPActionOutput(3, 1500)392 self._verify[3][3] = action393 table_id = 3394 self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,395 actions=[action], table_id=table_id)396 dp.send_barrier()397 self.send_flow_stats(dp)398 def verify_flow_mod_table_id(self, dp, msg):399 return self._verify_flow_value(dp, msg)400 def test_flow_mod_cookie(self, dp):401 self._add_flow_for_flow_mod_tests(dp)402 # modify flow of table_id=1403 action = dp.ofproto_parser.OFPActionOutput(3, 1500)404 self._verify[1][3] = action405 cookie = 0xff00406 cookie_mask = 0xffff407 table_id = 1408 self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,409 actions=[action], table_id=table_id,410 cookie=cookie, cookie_mask=cookie_mask)411 dp.send_barrier()412 self.send_flow_stats(dp)413 def verify_flow_mod_cookie(self, dp, msg):414 return self._verify_flow_value(dp, msg)415 def test_flow_mod_cookie_mask(self, dp):416 self._add_flow_for_flow_mod_tests(dp)417 # modify flow of table_id=0,1418 action = dp.ofproto_parser.OFPActionOutput(3, 1500)419 self._verify[0][3] = action420 self._verify[1][3] = action421 cookie = 0xffff422 cookie_mask = 0xff00423 for table_id in range(2):424 self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,425 actions=[action], table_id=table_id,426 cookie=cookie, cookie_mask=cookie_mask)427 dp.send_barrier()428 self.send_flow_stats(dp)429 def verify_flow_mod_cookie_mask(self, dp, msg):430 return self._verify_flow_value(dp, msg)431 def test_flow_mod_match(self, dp):432 self._add_flow_for_flow_mod_tests(dp)433 # modify flow of table_id=3434 action = dp.ofproto_parser.OFPActionOutput(3, 1500)435 self._verify[3][3] = action436 match = dp.ofproto_parser.OFPMatch()437 match.set_dl_dst(b'\xff' * 6)438 table_id = 3439 self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY,440 actions=[action], table_id=table_id, match=match)441 dp.send_barrier()442 self.send_flow_stats(dp)443 def verify_flow_mod_match(self, dp, msg):444 return self._verify_flow_value(dp, msg)445 def test_flow_mod_strict(self, dp):446 self._add_flow_for_flow_mod_tests(dp)447 # modify flow of table_id=2448 action = dp.ofproto_parser.OFPActionOutput(3, 1500)449 self._verify[2][3] = action450 match = dp.ofproto_parser.OFPMatch()451 match.set_dl_dst(b'\xee' * 6)452 priority = 100453 table_id = 2454 self.mod_flow(dp, command=dp.ofproto.OFPFC_MODIFY_STRICT,455 actions=[action], table_id=table_id,456 match=match, priority=priority)457 dp.send_barrier()458 self.send_flow_stats(dp)459 def verify_flow_mod_strict(self, dp, msg):460 return self._verify_flow_value(dp, msg)461 def test_flow_del_table_id(self, dp):462 self._add_flow_for_flow_mod_tests(dp)463 # delete flow of table_id=3464 del self._verify[3]465 table_id = 3466 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,467 table_id=table_id)468 dp.send_barrier()469 self.send_flow_stats(dp)470 def verify_flow_del_table_id(self, dp, msg):471 return self._verify_flow_value(dp, msg)472 def test_flow_del_table_id_all(self, dp):473 self._add_flow_for_flow_mod_tests(dp)474 # delete all flows475 self._verify = {}476 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,477 table_id=dp.ofproto.OFPTT_ALL)478 dp.send_barrier()479 self.send_flow_stats(dp)480 def verify_flow_del_table_id_all(self, dp, msg):481 return self._verify_flow_value(dp, msg)482 def test_flow_del_cookie(self, dp):483 self._add_flow_for_flow_mod_tests(dp)484 # delete flow of table_id=1485 del self._verify[1]486 cookie = 0xff00487 cookie_mask = 0xffff488 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,489 table_id=dp.ofproto.OFPTT_ALL,490 cookie=cookie, cookie_mask=cookie_mask)491 dp.send_barrier()492 self.send_flow_stats(dp)493 def verify_flow_del_cookie(self, dp, msg):494 return self._verify_flow_value(dp, msg)495 def test_flow_del_cookie_mask(self, dp):496 self._add_flow_for_flow_mod_tests(dp)497 # delete flow of table_id=0,1498 del self._verify[0]499 del self._verify[1]500 cookie = 0xffff501 cookie_mask = 0xff00502 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,503 table_id=dp.ofproto.OFPTT_ALL,504 cookie=cookie, cookie_mask=cookie_mask)505 dp.send_barrier()506 self.send_flow_stats(dp)507 def verify_flow_del_cookie_mask(self, dp, msg):508 return self._verify_flow_value(dp, msg)509 def test_flow_del_match(self, dp):510 self._add_flow_for_flow_mod_tests(dp)511 # delete flow of table_id=3512 del self._verify[3]513 match = dp.ofproto_parser.OFPMatch()514 match.set_dl_dst(b'\xff' * 6)515 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,516 table_id=dp.ofproto.OFPTT_ALL, match=match)517 dp.send_barrier()518 self.send_flow_stats(dp)519 def verify_flow_del_match(self, dp, msg):520 return self._verify_flow_value(dp, msg)521 def test_flow_del_out_port(self, dp):522 self._add_flow_for_flow_mod_tests(dp)523 # delete flow of table_id=1524 del self._verify[1]525 out_port = 2526 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE,527 table_id=dp.ofproto.OFPTT_ALL, out_port=out_port)528 dp.send_barrier()529 self.send_flow_stats(dp)530 def verify_flow_del_out_port(self, dp, msg):531 return self._verify_flow_value(dp, msg)532 def test_flow_del_strict(self, dp):533 self._add_flow_for_flow_mod_tests(dp)534 # delete flow of table_id=2535 del self._verify[2]536 match = dp.ofproto_parser.OFPMatch()537 match.set_dl_dst(b'\xee' * 6)538 priority = 100539 self.mod_flow(dp, command=dp.ofproto.OFPFC_DELETE_STRICT,540 table_id=dp.ofproto.OFPTT_ALL,541 match=match, priority=priority)542 dp.send_barrier()543 self.send_flow_stats(dp)544 def verify_flow_del_strict(self, dp, msg):545 return self._verify_flow_value(dp, msg)546 def _send_port_mod(self, dp, config, mask):547 p = self.get_port(dp)548 if not p:549 err = 'need attached port to switch.'550 self.results[self.current] = err551 self.start_next_test(dp)552 return553 self._verify = [p.port_no, config & mask]554 m = dp.ofproto_parser.OFPPortMod(dp, p.port_no, p.hw_addr,555 config, mask, 0)556 dp.send_msg(m)557 dp.send_barrier()558 # TODO: waiting to port UP|DOWN.559 time.sleep(1)560 m = dp.ofproto_parser.OFPFeaturesRequest(dp)561 dp.send_msg(m)562 def _verify_port_mod_config(self, dp, msg):563 port_no = self._verify[0]564 config = self._verify[1]565 port = msg.ports[port_no]566 if config != port.config:567 return "config is mismatched. verify=%s, stats=%s" \568 % (bin(config), bin(port.config))569 return True570 def test_port_mod_config_01_all(self, dp):571 config = 0b1100101572 mask = 0b1111111573 self._send_port_mod(dp, config, mask)574 def verify_port_mod_config_01_all(self, dp, msg):575 return self._verify_port_mod_config(dp, msg)576 def test_port_mod_config_02_none(self, dp):577 config = 0578 mask = 0b1111111579 self._send_port_mod(dp, config, mask)580 def verify_port_mod_config_02_none(self, dp, msg):581 return self._verify_port_mod_config(dp, msg)582 def test_port_mod_config_03_mask(self, dp):583 config = 0b1100101584 mask = 0b1111000585 self._send_port_mod(dp, config, mask)586 def verify_port_mod_config_03_mask(self, dp, msg):587 res = self._verify_port_mod_config(dp, msg)588 # reset port config589 port_no = self._verify[0]590 p = msg.ports[port_no]591 m = dp.ofproto_parser.OFPPortMod(dp, p.port_no, p.hw_addr,592 0, 0b1111111, 0)593 dp.send_msg(m)594 dp.send_barrier()595 return res596 def test_port_stats_port_no(self, dp):597 p = self.get_port(dp)598 if not p:599 err = 'need attached port to switch.'600 self.results[self.current] = err601 self.start_next_test(dp)602 return603 self._verify = p.port_no604 m = dp.ofproto_parser.OFPPortStatsRequest(dp, p.port_no)605 dp.send_msg(m)606 def verify_port_stats_port_no(self, dp, msg):607 ports = msg.body608 if len(ports) > 1:609 return 'reply some ports.\n%s' % (ports)610 if ports[0].port_no != self._verify:611 return 'port_no is mismatched. request=%s reply=%s' \612 % (self._verify, ports[0].port_no)613 return True614 def _add_flow_flow_removed(self, dp, reason, table_id=0,615 cookie=0xff, priority=100, in_port=1,616 idle_timeout=0, hard_timeout=0):617 self._verify = {}618 self._verify['params'] = {'reason': reason,619 'table_id': table_id,620 'cookie': cookie,621 'priority': priority}622 self._verify['in_port'] = in_port623 self._verify['timeout'] = idle_timeout624 if hard_timeout:625 if (idle_timeout == 0 or idle_timeout > hard_timeout):626 self._verify['timeout'] = hard_timeout627 match = dp.ofproto_parser.OFPMatch()628 match.set_in_port(in_port)629 self.mod_flow(dp, match=match, cookie=cookie,630 priority=priority, table_id=table_id,631 idle_timeout=idle_timeout, hard_timeout=hard_timeout,632 flags=dp.ofproto.OFPFF_SEND_FLOW_REM)633 def _verify_flow_removed(self, dp, msg):634 params = self._verify['params']635 in_port = self._verify['in_port']636 timeout = self._verify['timeout']637 if timeout:638 duration_nsec = (msg.duration_sec * 10 ** 9) + msg.duration_nsec639 timeout_nsec = timeout * 10 ** 9640 # grace of -0.5 and +1.5 second to timeout.641 l = (timeout - 0.5) * 10 ** 9642 h = (timeout + 1.5) * 10 ** 9643 if not l < duration_nsec < h:644 return 'bad duration time. set=%s(nsec), duration=%s(nsec)' \645 % (timeout_nsec, duration_nsec)646 for name, val in params.items():647 r_val = getattr(msg, name)648 if val != r_val:649 return '%s is mismatched. verify=%s, reply=%s' \650 % (name, val, r_val)651 for f in msg.match.fields:652 if f.header == ofproto_v1_2.OXM_OF_IN_PORT:653 if f.value != in_port:654 return 'in_port is mismatched. verify=%s, reply=%s' \655 % (in_port, f.value)656 return True657 def test_flow_removed_idle_timeout(self, dp):658 reason = dp.ofproto.OFPRR_IDLE_TIMEOUT659 idle_timeout = 2660 self._add_flow_flow_removed(dp, reason,661 idle_timeout=idle_timeout)662 def verify_flow_removed_idle_timeout(self, dp, msg):663 return self._verify_flow_removed(dp, msg)664 def test_flow_removed_idle_timeout_hit(self, dp):665 reason = dp.ofproto.OFPRR_IDLE_TIMEOUT666 idle_timeout = 5667 in_port = 1668 sleep = 2669 # add target flow670 self._add_flow_flow_removed(dp, reason, in_port=in_port,671 idle_timeout=idle_timeout)672 self._verify['timeout'] = idle_timeout + sleep673 # sleep674 time.sleep(sleep)675 # packet out676 output = dp.ofproto.OFPP_TABLE677 actions = [dp.ofproto_parser.OFPActionOutput(output, 0)]678 m = dp.ofproto_parser.OFPPacketOut(dp, 0xffffffff, in_port,679 actions, None)680 dp.send_msg(m)681 def verify_flow_removed_idle_timeout_hit(self, dp, msg):682 return self._verify_flow_removed(dp, msg)683 def test_flow_removed_hard_timeout(self, dp):684 reason = dp.ofproto.OFPRR_HARD_TIMEOUT685 hard_timeout = 2686 self._add_flow_flow_removed(dp, reason,687 hard_timeout=hard_timeout)688 def verify_flow_removed_hard_timeout(self, dp, msg):689 return self._verify_flow_removed(dp, msg)690 def test_flow_removed_hard_timeout_hit(self, dp):691 reason = dp.ofproto.OFPRR_HARD_TIMEOUT692 hard_timeout = 5693 in_port = 1694 sleep = 2695 self._add_flow_flow_removed(dp, reason, in_port=in_port,696 hard_timeout=hard_timeout)697 dp.send_barrier()698 # sleep699 time.sleep(sleep)700 # packet out701 output = dp.ofproto.OFPP_TABLE702 actions = [dp.ofproto_parser.OFPActionOutput(output, 0)]703 m = dp.ofproto_parser.OFPPacketOut(dp, 0xffffffff, in_port,704 actions, None)705 dp.send_msg(m)706 def verify_flow_removed_hard_timeout_hit(self, dp, msg):707 return self._verify_flow_removed(dp, msg)708 def test_flow_removed_delete(self, dp):709 reason = dp.ofproto.OFPRR_DELETE710 self._add_flow_flow_removed(dp, reason)711 dp.send_barrier()712 self.delete_all_flows(dp)713 def verify_flow_removed_delete(self, dp, msg):714 return self._verify_flow_removed(dp, msg)715 def test_flow_removed_table_id(self, dp):716 reason = dp.ofproto.OFPRR_DELETE717 table_id = 1718 self._add_flow_flow_removed(dp, reason, table_id=table_id)719 dp.send_barrier()720 self.delete_all_flows(dp)721 def verify_flow_removed_table_id(self, dp, msg):722 return self._verify_flow_removed(dp, msg)723 def _send_packet_out(self, dp, buffer_id=0xffffffff,724 in_port=None, output=None, data=''):725 if in_port is None:726 in_port = dp.ofproto.OFPP_LOCAL727 if output is None:728 output = dp.ofproto.OFPP_CONTROLLER729 self._verify['in_port'] = in_port730 self._verify['data'] = data731 actions = [dp.ofproto_parser.OFPActionOutput(output, len(data))]732 m = dp.ofproto_parser.OFPPacketOut(dp, buffer_id, in_port,733 actions, data)734 dp.send_msg(m)735 def _verify_packet_in(self, dp, msg):736 for name, val in self._verify.items():737 if name == 'in_port':738 for f in msg.match.fields:739 if f.header == ofproto_v1_2.OXM_OF_IN_PORT:740 r_val = f.value741 else:742 r_val = getattr(msg, name)743 if val != r_val:744 return '%s is mismatched. verify=%s, reply=%s' \745 % (name, val, r_val)746 return True747 def test_packet_in_action(self, dp):748 self._verify = {}749 self._verify['reason'] = dp.ofproto.OFPR_ACTION750 self._send_packet_out(dp)751 def verify_packet_in_action(self, dp, msg):752 return self._verify_packet_in(dp, msg)753 def test_packet_in_data(self, dp):754 self._verify = {}755 self._verify['reason'] = dp.ofproto.OFPR_ACTION756 data = 'test'757 self._send_packet_out(dp, data=data)758 def verify_packet_in_data(self, dp, msg):759 return self._verify_packet_in(dp, msg)760 def test_packet_in_table_id(self, dp):761 in_port = 1762 table_id = 2763 output = dp.ofproto.OFPP_TABLE764 self._verify = {}765 self._verify['reason'] = dp.ofproto.OFPR_ACTION766 self._verify['table_id'] = table_id767 # add flow (goto_table)768 match = dp.ofproto_parser.OFPMatch()769 match.set_in_port(in_port)770 inst = [dp.ofproto_parser.OFPInstructionGotoTable(table_id)]771 self.mod_flow(dp, inst=inst, match=match)772 # add flow (output)773 match = dp.ofproto_parser.OFPMatch()774 match.set_in_port(in_port)775 out = dp.ofproto.OFPP_CONTROLLER776 actions = [dp.ofproto_parser.OFPActionOutput(out, 0)]777 self.mod_flow(dp, actions=actions, match=match, table_id=table_id)778 dp.send_barrier()779 # packet out780 self._send_packet_out(dp, in_port=in_port, output=output)781 def verify_packet_in_table_id(self, dp, msg):782 return self._verify_packet_in(dp, msg)783 # handler784 @set_ev_cls(ofp_event.EventOFPEchoReply, MAIN_DISPATCHER)785 def echo_replay_handler(self, ev):786 if self.current.find('echo_request') > 0:787 self.run_verify(ev)788 @set_ev_cls(ofp_event.EventOFPStatsReply, MAIN_DISPATCHER)789 def stats_reply_handler(self, ev):790 if self.current is None:791 msg = ev.msg792 dp = msg.datapath793 if self._verify == dp.ofproto.OFPST_TABLE:794 self.table_stats = msg.body795 self.start_next_test(dp)796 else:797 self.run_verify(ev)798 @set_ev_cls(ofp_event.EventOFPSwitchFeatures, MAIN_DISPATCHER)799 def features_replay_handler(self, ev):800 if self.current is None:801 pass802 else:803 self.run_verify(ev)804 @set_ev_cls(ofp_event.EventOFPGetConfigReply, MAIN_DISPATCHER)805 def get_config_replay_handler(self, ev):806 self.run_verify(ev)807 @set_ev_cls(ofp_event.EventOFPBarrierReply, MAIN_DISPATCHER)808 def barrier_replay_handler(self, ev):809 if self.current == 'test_barrier_request':810 self.run_verify(ev)811 @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER)812 def port_status_handler(self, ev):813 pass814 @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)815 def packet_in_handler(self, ev):816 if self.current.find('packet_in'):817 self.run_verify(ev)818 @set_ev_cls(ofp_event.EventOFPFlowRemoved, MAIN_DISPATCHER)819 def flow_removed_handler(self, ev):820 if self.current.find('flow_removed') > 0:821 self.run_verify(ev)822 @set_ev_cls(ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER)823 def error_handler(self, ev):824 if self.current.find('error') > 0:825 self.run_verify(ev)826 def is_supported(self, t):827 unsupported = [828 ]829 for u in unsupported:830 if t.find(u) != -1:831 return False...
test_lab04.py
Source:test_lab04.py
...29 print(msg,file=sys.stderr)30# Caricale nel namespace31globals().update(f_objects)32class TestLab04Segmenti(unittest.TestCase):33 def _verify(self,seq,expected):34 result = segmenticrescenti(seq)35 messaggio = "\nTEST FAIL> La funzione segmenticrescenti({})\n dovrebbe produrre: '{}',\n invece produce: '{}'".format(seq,expected,result)36 self.assertEqual(expected,result,msg=messaggio)37 def _verify_raise(self,seq,error,reason):38 msg="\nTEST FAIL> segmenticrescenti({}) dovrebbe sollevare un {} perché {}."39 msg = msg.format(seq,error,reason)40 with self.assertRaises(error,msg=msg):41 segmenticrescenti(seq)42 def test_segmenti_inconfrontabili(self):43 self._verify_raise([13,"stringa1","stringa2"],44 TypeError,45 "ha elementi non confrontabili")46 def test_segmenti_zero(self):47 self._verify([],[])48 def test_segmenti_one(self):49 self._verify([1],[[1]])50 def test_segmenti_zerot(self):51 self._verify((),[])52 def test_segmenti_one(self):53 self._verify((1,),[[1]])54 def test_segmenti_crescente(self):55 self._verify([1,2,3,4,5],[[1,2,3,4,5]])56 def test_segmenti_decrescente(self):57 self._verify([5,4,3,2,1],[[5],[4],[3],[2],[1]])58 59 def test_segmenti_due(self):60 self._verify([1,2,3,-1,2,3],[[1,2,3],[-1,2,3]])61 def test_segmenti_singleton_iniziale(self):62 self._verify([4,-3,-2,2,7], [[4],[-3,-2,2,7]] )63 def test_segmenti_singleton_finale(self):64 self._verify([-3,-2,2,7,4], [[-3,-2,2,7],[4]] )65 def test_segmenti_uguali(self):66 self._verify([1,1,1,1,1], [[1,1,1,1,1]] )67 def test_segmenti_uguali(self):68 self._verify([2,2,1,1,1], [[2,2],[1,1,1]] )69 70 def test_segmenti_esempio(self):71 self._verify([1,-1,2,4,3,7,8,8,5] ,72 [ [1], [-1,2,4], [3,7,8,8], [5] ] )73 def test_segmenti_stringa(self):74 self._verify(['casa','studio','garage','mansarda','villaggio'] ,75 [ ['casa','studio'], ['garage','mansarda','villaggio'] ] )76 def test_segmenti_stringa2(self):77 self._verify('analfabetismo' ,78 [ ['a','n'], ['a','l'],['f'],['a','b','e','t'],['i','s'],['m','o'] ] )79 80class TestLab04SommeParziali(unittest.TestCase):81 82 def _verify(self,seq,expected):83 result = sommeparziali(seq)84 messaggio = "\nTEST FAIL> La funzione sommeparziali({})\n dovrebbe produrre: '{}',\n invece produce: '{}'".format(seq,expected,result)85 self.assertEqual(expected,result,msg=messaggio)86 def _verify_raise(self,seq,error,reason):87 msg="\nTEST FAIL> sommeparziali({}) dovrebbe sollevare un {} perché {}."88 msg = msg.format(seq,error,reason)89 with self.assertRaises(error,msg=msg):90 sommeparziali(seq)91 92 def test_parziali_zero(self):93 self._verify([],[])94 def test_parziali_one(self):95 self._verify([13],[13])96 def test_parziali_zerot(self):97 self._verify((),[])98 def test_parziali_one(self):99 self._verify((12,),[12])100 def test_parziali_string(self):101 self._verify(('aa','bb','cc'),['aa','aabb','aabbcc'])102 103 def test_somme_insommabili1(self):104 self._verify_raise([13,"stringa1","stringa2"],105 TypeError,106 "ha elementi non sommabili")107 def test_somme_insommabili2(self):108 self._verify_raise(('ciao',1.2),109 TypeError,110 "ha elementi non sommabili")111 def test_small1(self):112 self._verify( [-2,2] , [-2,0])113 114 def test_small2(self):115 self._verify( [1,2] , [1,3])116 def test_ones(self):117 self._verify( [1,1,1,1,1,1,1,1] , [1,2,3,4,5,6,7,8])118 def test_sequence(self):119 self._verify( [1,2,3,4,5,6,7,8] , [1,3,6,10,15,21,28,36])120 def test_mixed(self):121 self._verify( [1,2.3], [1,3.3])122 123 124 125if __name__ == '__main__':126 unittest.main()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!