Best Python code snippet using autotest_python
test_route_table_manager.py
Source:test_route_table_manager.py
...74 def tearDown(self):75 super(TestRouteTableManager, self).tearDown()76 self.rtm.stop()77 self.rtm.join()78 def _new_worker(self, worker_name, worker_type):79 worker = mock.Mock(spec=worker_type, name=worker_name)80 worker.name = worker_name81 worker.enqueue = mock.Mock()82 worker._rtm_matches = set()83 worker._rtm_route_entries = set()84 return worker85 def _worker_subscriptions(self, worker, rts, wait=True,86 afi=exa.AFI(exa.AFI.ipv4),87 safi=exa.SAFI(exa.SAFI.mpls_vpn)):88 for rt in rts:89 subscribe = engine.Subscription(afi, safi, rt, worker)90 self.rtm.enqueue(subscribe)91 if wait:92 self._wait()93 def _worker_unsubscriptions(self, worker, rts, wait=True,94 afi=exa.AFI(exa.AFI.ipv4),95 safi=exa.SAFI(exa.SAFI.mpls_vpn)):96 for rt in rts:97 unsubscribe = engine.Unsubscription(afi, safi, rt, worker)98 self.rtm.enqueue(unsubscribe)99 if wait:100 self._wait()101 def _check_subscriptions(self, worker, matches):102 for match in matches:103 self.assertIn(match, worker._rtm_matches,104 "Subscription not found")105 def _check_unsubscriptions(self, worker, matches):106 if '_rtm_matches' not in worker.__dict__:107 return108 for match in matches:109 self.assertNotIn(match, worker._rtm_matches,110 "Subscription found while it should not: %s" %111 worker._rtm_matches)112 def _check_events_calls(self, events, advertised_routes, withdrawn_nlris):113 # checks that each advertise event in 'events' is in advertised_routes,114 # that each withdraw event in 'events' is in withdrawn_nlris115 # and that all events in withdrawn_nlris and advertised_routes are in116 # 'events'117 for (call_args, _) in events:118 if (call_args[0].type == engine.RouteEvent.ADVERTISE):119 self.assertIn(call_args[0].route_entry, advertised_routes,120 "Bad advertised route")121 advertised_routes.remove(call_args[0].route_entry)122 else: # WITHDRAW123 self.assertIn(call_args[0].route_entry.nlri, withdrawn_nlris,124 "Bad withdrawn route")125 withdrawn_nlris.remove(call_args[0].route_entry.nlri)126 self.assertEqual(0, len(advertised_routes), "some routes not advert'd")127 self.assertEqual(0, len(withdrawn_nlris), "some routes not withdrawn")128 def test_a1_subscriptions_with_no_route_to_synthesize(self):129 # Worker1 subscribes to RT1 and RT2130 worker1 = self._new_worker("worker.Worker-1", worker.Worker)131 self._worker_subscriptions(worker1, [t.RT1, t.RT2])132 # check subscriptions133 self._check_subscriptions(worker1, [MATCH1, MATCH2])134 def test_a1_check_first_last_local_worker_callback(self):135 bgp_worker1 = self._new_worker("worker.Worker-1", bpw.BGPPeerWorker)136 self._worker_subscriptions(bgp_worker1, [t.RT1])137 self._wait()138 self.assertEqual(139 0,140 self.rtm.first_local_subscriber_callback.call_count,141 "first_local_subscriber_callback should not have been called "142 " (non local worker)")143 worker1 = self._new_worker("worker.Worker-1", worker.Worker)144 self._worker_subscriptions(worker1, [t.RT1])145 self.assertEqual(146 1,147 self.rtm.first_local_subscriber_callback.call_count,148 "first_local_subscriber_callback should have been called")149 worker2 = self._new_worker("worker.Worker-2", worker.Worker)150 self._worker_subscriptions(worker2, [t.RT1])151 self.assertEqual(152 1,153 self.rtm.first_local_subscriber_callback.call_count,154 "first_local_subscriber_callback should not have been called a "155 "second time")156 self._worker_unsubscriptions(worker2, [t.RT1])157 self.assertEqual(158 0,159 self.rtm.last_local_subscriber_callback.call_count,160 "last_local_subscriber_callback should not have been called")161 self._worker_unsubscriptions(worker1, [t.RT1])162 self.assertEqual(163 1,164 self.rtm.last_local_subscriber_callback.call_count,165 "last_local_subscriber_callback should have been called")166 self._worker_unsubscriptions(bgp_worker1, [t.RT1])167 self.assertEqual(168 1,169 self.rtm.last_local_subscriber_callback.call_count,170 "last_local_subscriber_callback should not have been called "171 " (non local worker)")172 def test_a2_subscriptions_with_route_to_synthesize(self):173 # BGPPeerWorker1 advertises a route for RT1 and RT2174 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)175 evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,176 [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)177 # BGPPeerWorker1 advertises an other route for RT2178 evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI2,179 [t.RT2], bgp_peer_worker1, t.NH1)180 # BGPPeerWorker1 subscribes to RT1181 self._worker_subscriptions(bgp_peer_worker1, [t.RT1])182 # Worker1 subscribes to RT1 and RT2183 worker1 = self._new_worker("worker.Worker-1", worker.Worker)184 self._worker_subscriptions(worker1, [t.RT1, t.RT2])185 # Worker2 subscribes to RT1186 worker2 = self._new_worker("worker.Worker-2", worker.Worker)187 self._worker_subscriptions(worker2, [t.RT1])188 # Worker3 subscribes to RT3189 worker3 = self._new_worker("worker.Worker-3", worker.Worker)190 self._worker_subscriptions(worker3, [t.RT3])191 # BGPPeerWorker2 subscribes to RT1192 bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)193 self._worker_subscriptions(bgp_peer_worker2, [t.RT1])194 # Waiting for RouteTableManager thread finishes to process the195 # subscription196 self._wait()197 # check route entry synthesized198 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,199 "Route should not be synthesized to its source")200 self.assertEqual(0, worker3.enqueue.call_count,201 "no route should be synthesized to Worker3")202 self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,203 "Route should not be synthesized between BGP workers")204 self.assertEqual(2, worker1.enqueue.call_count,205 "2 advertise events should be synthesized to Worker1")206 self._check_events_calls(worker1.enqueue.call_args_list,207 [evt1.route_entry, evt2.route_entry], [])208 self.assertEqual(1, worker2.enqueue.call_count,209 "1 advertise event should be synthesized to Worker2")210 self._check_events_calls(211 worker2.enqueue.call_args_list, [evt1.route_entry], [])212 def test_a3_resubscription(self):213 # BGPPeerWorker1 advertises a route for RT1 and RT2214 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)215 route_event = self._new_route_event(engine.RouteEvent.ADVERTISE,216 t.NLRI1, [t.RT1, t.RT2],217 bgp_peer_worker1, t.NH1)218 # Worker1 subscribes to RT1 and RT2219 worker1 = self._new_worker("worker.Worker-1", worker.Worker)220 self._worker_subscriptions(worker1, [t.RT1, t.RT2])221 # Worker1 subscribes again to RT1222 self._worker_subscriptions(worker1, [t.RT1])223 # Worker2 subscribes to RT1224 worker2 = self._new_worker("worker.Worker-2", worker.Worker)225 self._worker_subscriptions(worker2, [t.RT1])226 # Worker1 subscribes again to RT2227 self._worker_subscriptions(worker2, [t.RT2])228 # check route entry synthesized229 self.assertEqual(1, worker1.enqueue.call_count,230 "1 route advertised should be synthesized to Worker1")231 self._check_events_calls(worker1.enqueue.call_args_list,232 [route_event.route_entry], [])233 self.assertEqual(1, worker2.enqueue.call_count,234 "1 route advertised should be synthesized to Worker2")235 self._check_events_calls(worker2.enqueue.call_args_list,236 [route_event.route_entry], [])237 def test_a4_two_subscriptions(self):238 # Worker1 subscribes to RT1239 worker1 = self._new_worker("worker.Worker-1", worker.Worker)240 self._worker_subscriptions(worker1, [t.RT1])241 # Worker2 subscribes to RT1242 worker2 = self._new_worker("worker.Worker-2", worker.Worker)243 self._worker_subscriptions(worker2, [t.RT1])244 # Worker2 advertises a route to RT1245 self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,246 [t.RT1], worker2, t.NH1)247 self.assertEqual(1, worker1.enqueue.call_count,248 "1 route advertised should be synthesized to Worker1")249 def test_b1_unsubscription_with_no_route_to_synthesize(self):250 # Worker1 subscribes to RT1 and RT2251 worker1 = self._new_worker("worker.Worker-1", worker.Worker)252 self._worker_subscriptions(worker1, [t.RT1, t.RT2])253 # BGPPeerWorker1 subscribes to RT1 and RT2254 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)255 self._worker_subscriptions(bgp_peer_worker1, [t.RT1, t.RT2])256 # Worker1 unsubscribes to RT1257 self._worker_unsubscriptions(worker1, [t.RT1])258 # BGPPeerWorker1 unsubscribes to RT1 and RT2259 self._worker_unsubscriptions(bgp_peer_worker1, [t.RT1, t.RT2])260 # check subscription/unsubscriptions261 self._check_unsubscriptions(worker1, [MATCH1])262 self._check_subscriptions(worker1, [MATCH2])263 self._check_unsubscriptions(bgp_peer_worker1, [MATCH1, MATCH2])264 def test_b2_unsubscription_with_route_to_synthesize(self):265 # BGPPeerWorker1 advertises a route for RT1266 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)267 evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE,268 t.NLRI1, [t.RT1, t.RT2],269 bgp_peer_worker1, t.NH1)270 # BGPPeerWorker1 advertises an other route for RT2271 evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE,272 t.NLRI2, [t.RT2], bgp_peer_worker1, t.NH1)273 # BGPPeerWorker1 subscribes to RT1274 self._worker_subscriptions(bgp_peer_worker1, [t.RT1])275 # Worker1 subscribes to RT1 and RT2276 worker1 = self._new_worker("worker.Worker-1", worker.Worker)277 self._worker_subscriptions(worker1, [t.RT1, t.RT2])278 # Worker2 subscribes to RT2279 worker2 = self._new_worker("worker.Worker-2", worker.Worker)280 self._worker_subscriptions(worker2, [t.RT2])281 # Worker3 subscribes to RT3282 worker3 = self._new_worker("worker.Worker-3", worker.Worker)283 self._worker_subscriptions(worker3, [t.RT3])284 # BGPPeerWorker2 subscribes to RT1285 bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)286 self._worker_subscriptions(bgp_peer_worker2, [t.RT1])287 # Workers and BGPPeerWorker unsubscriptions288 self._worker_unsubscriptions(bgp_peer_worker1, [t.RT1], False)289 self._worker_unsubscriptions(worker1, [t.RT1], False)290 self._worker_unsubscriptions(worker2, [t.RT2], False)291 self._worker_unsubscriptions(worker3, [t.RT3], False)292 self._worker_unsubscriptions(bgp_peer_worker2, [t.RT1], False)293 # Waiting for RouteTableManager thread finishes to process the294 # subscription295 self._wait()296 # check route entry synthesized297 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,298 "Route should not be synthesized to its source")299 self.assertEqual(0, worker3.enqueue.call_count,300 "no route should be synthesized to Worker3")301 self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,302 "Route should not be synthesized between "303 "BGPPeerWorkers")304 self.assertEqual(2, worker1.enqueue.call_count,305 "2 advertise event should be synthesized to Worker1")306 self._check_events_calls(worker1.enqueue.call_args_list,307 [evt1.route_entry, evt2.route_entry], [])308 self.assertEqual(4, worker2.enqueue.call_count,309 "4 events should be synthesized to Worker2: "310 "2 advertise and 2 withdraw")311 self._check_events_calls(worker2.enqueue.call_args_list,312 [evt1.route_entry, evt2.route_entry],313 [evt1.route_entry.nlri,314 evt2.route_entry.nlri])315 def test_b3_unsubscription_not_registered(self):316 # Worker1 subscribes to RT1317 worker1 = self._new_worker("worker.Worker-1", worker.Worker)318 self._worker_subscriptions(worker1, [t.RT1])319 # Worker1 unsubscribes to RT2320 self._worker_unsubscriptions(worker1, [t.RT2])321 # BGPPeerWorker1 unsubscribes to RT1322 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)323 self._worker_unsubscriptions(bgp_peer_worker1, [t.RT1, t.RT2])324 # check subscription/unsubscriptions325 self._check_subscriptions(worker1, [MATCH1])326 self._check_unsubscriptions(bgp_peer_worker1, [MATCH1, MATCH2])327 def test_c1_route_advertise_by_worker_without_propagation(self):328 # Worker1 advertises a route for RT1 and RT2329 worker1 = self._new_worker("worker.Worker-1", worker.Worker)330 route_event = self._new_route_event(engine.RouteEvent.ADVERTISE,331 t.NLRI1, [t.RT1, t.RT2],332 worker1, t.NH1)333 # check route entry has been inserted334 self.assertIn(route_event.route_entry, worker1._rtm_route_entries,335 "Route entry not found")336 def test_c2_route_withdraw_by_worker_without_propagation(self):337 # Worker1 advertises then withdraws a route338 worker1 = self._new_worker("worker.Worker-1", worker.Worker)339 self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,340 [t.RT1, t.RT2], worker1, t.NH1)341 route_event = self._new_route_event(engine.RouteEvent.WITHDRAW,342 t.NLRI1, [t.RT1], worker1, t.NH1)343 # check route entry has been removed344 self.assertNotIn(route_event.route_entry, worker1._rtm_route_entries,345 "Route entry found")346 def test_c3_route_advertise_by_bgp_peer_with_propagation(self):347 # Worker1 subscribes to RT1348 worker1 = self._new_worker("worker.Worker-1", worker.Worker)349 self._worker_subscriptions(worker1, [t.RT1])350 # Worker2 subscribes to RT2351 worker2 = self._new_worker("worker.Worker-2", worker.Worker)352 self._worker_subscriptions(worker2, [t.RT2])353 # BGPPeerWorker1 subscribes to RT1 and RT2354 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)355 self._worker_subscriptions(bgp_peer_worker1, [t.RT1, t.RT2])356 # BGPPeerWorker2 subscribes to RT1 and RT2357 bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)358 self._worker_subscriptions(bgp_peer_worker2, [t.RT1, t.RT2])359 # BGPPeerWorker1 advertises a route for RT1360 route_event = self._new_route_event(engine.RouteEvent.ADVERTISE,361 t.NLRI1, [t.RT1], bgp_peer_worker1,362 t.NH1)363 # check route_event propagation364 self.assertEqual(1, worker1.enqueue.call_count,365 "1 route should be propagated to Worker1")366 self._check_events_calls(worker1.enqueue.call_args_list,367 [route_event.route_entry], [])368 self.assertEqual(0, worker2.enqueue.call_count,369 "no route should be propagated to Worker2")370 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,371 "Route should not be propagated to its source")372 self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,373 "Route should not be propagated between BGP workers")374 def test_c4_route_withdraw_by_peer_worker_with_propagation(self):375 # Worker1 subscribes to RT1376 worker1 = self._new_worker("worker.Worker-1", worker.Worker)377 self._worker_subscriptions(worker1, [t.RT1])378 # Worker2 subscribes to RT2379 worker2 = self._new_worker("worker.Worker-2", worker.Worker)380 self._worker_subscriptions(worker2, [t.RT2])381 # Worker3 subscribes to RT3382 worker3 = self._new_worker("worker.Worker-3", worker.Worker)383 self._worker_subscriptions(worker3, [t.RT3])384 # BGPPeerWorker1 subscribes to RT1385 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)386 self._worker_subscriptions(bgp_peer_worker1, [t.RT1])387 # BGPPeerWorker2 subscribes to RT2388 bgp_peer_worker2 = self._new_worker("BGPWorker2", bpw.BGPPeerWorker)389 self._worker_subscriptions(bgp_peer_worker2, [t.RT2])390 # BGPPeerWorker1 advertises a route for RT1 and RT2391 route_eventA = self._new_route_event(engine.RouteEvent.ADVERTISE,392 t.NLRI1, [t.RT1, t.RT2],393 bgp_peer_worker1, t.NH1)394 # BGPPeerWorker1 withdraw previous route (without RT395 route_eventW = self._new_route_event(engine.RouteEvent.WITHDRAW,396 t.NLRI1, [],397 bgp_peer_worker1, t.NH1)398 # check route_event propagation399 self.assertEqual(2, worker1.enqueue.call_count,400 "2 routes should be propagated to Worker1")401 self._check_events_calls(worker1.enqueue.call_args_list,402 [route_eventA.route_entry],403 [route_eventW.route_entry.nlri])404 self.assertEqual(2, worker2.enqueue.call_count,405 "2 routes should be propagated to Worker2")406 self._check_events_calls(worker2.enqueue.call_args_list,407 [route_eventA.route_entry],408 [route_eventW.route_entry.nlri])409 self.assertEqual(0, worker3.enqueue.call_count,410 "No route should be propagated to Worker3")411 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,412 "Route should not be propagated to its source")413 self.assertEqual(0, bgp_peer_worker2.enqueue.call_count,414 "Route should not be propagated between BGP workers")415 def test_c5_route_update_by_bgp_peer_with_withdraw_propagation(self):416 # Worker1 subscribes to RT1417 worker1 = self._new_worker("worker.Worker-1", worker.Worker)418 self._worker_subscriptions(worker1, [t.RT1])419 # Worker2 subscribes to RT2420 worker2 = self._new_worker("worker.Worker-2", worker.Worker)421 self._worker_subscriptions(worker2, [t.RT2])422 # Worker3 subscribes to RT3423 worker3 = self._new_worker("worker.Worker-3", worker.Worker)424 self._worker_subscriptions(worker3, [t.RT3])425 # BGPPeerWorker1 advertises a route for RT1, RT2 and RT3426 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)427 evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,428 [t.RT1, t.RT2, t.RT3],429 bgp_peer_worker1, t.NH1)430 # BGPPeerWorker1 advertises the same nlri with attributes NH and RTs431 # modification432 evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,433 [t.RT1, t.RT2],434 bgp_peer_worker1, t.NH2)435 # check route event propagation436 # TO DO : check route_event.replaced_route437 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,438 "Route should not be propagated to its source")439 self.assertEqual(2, worker1.enqueue.call_count,440 "2 routes should be advertised to Worker1")441 self._check_events_calls(worker1.enqueue.call_args_list,442 [evt1.route_entry, evt2.route_entry], [])443 self.assertEqual(2, worker2.enqueue.call_count,444 "2 routes should be advertised to Worker2")445 self._check_events_calls(worker2.enqueue.call_args_list,446 [evt1.route_entry, evt2.route_entry], [])447 self.assertEqual(2, worker3.enqueue.call_count,448 "2 routes should be advertised/withdrawn to Worker3")449 self._check_events_calls(worker3.enqueue.call_args_list,450 [evt1.route_entry], [evt1.route_entry.nlri])451 def test_c6_route_readvertised(self):452 # Worker1 subscribes to RT1453 worker1 = self._new_worker("worker.Worker-1", worker.Worker)454 self._worker_subscriptions(worker1, [t.RT1, t.RT2, t.RT3])455 # BGPPeerWorker1 advertises a route for RT1, RT2 and RT3456 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)457 evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,458 [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)459 # BGPPeerWorker1 advertises the same nlri with same attributes and RTs460 self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,461 [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)462 # check route event propagation463 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,464 "Route should not be propagated to its source")465 self.assertEqual(1, worker1.enqueue.call_count,466 "only 1 route should be advertised to Worker1")467 self._check_events_calls(worker1.enqueue.call_args_list,468 [evt1.route_entry], [])469 def test_c7_route_withdraw_not_registered(self):470 # Worker1 subscribes to RT1471 worker1 = self._new_worker("worker.Worker-1", worker.Worker)472 self._worker_subscriptions(worker1, [t.RT1, t.RT2])473 # BGPPeerWorker1 advertises a route for RT1 and RT2474 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)475 evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,476 [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)477 # BGPPeerWorker1 withdraw a not registered route (without RT478 self._new_route_event(engine.RouteEvent.WITHDRAW, t.NLRI2, [],479 bgp_peer_worker1, t.NH1)480 # Waiting for RouteTableManager thread finishes to process route_event481 self._wait()482 # check route_event propagation483 self.assertEqual(1, worker1.enqueue.call_count,484 "1 route1 should be propagated to Worker1")485 self._check_events_calls(worker1.enqueue.call_args_list,486 [evt1.route_entry], [])487 self.assertEqual(0, bgp_peer_worker1.enqueue.call_count,488 "Route should not be propagated back to its source")489 def test_d1_worker_cleanup(self):490 # Worker1 subscribes to RT1491 worker1 = self._new_worker("worker.Worker-1", worker.Worker)492 self._worker_subscriptions(worker1, [t.RT1])493 # Worker2 subscribes to RT2494 worker2 = self._new_worker("worker.Worker-2", worker.Worker)495 self._worker_subscriptions(worker2, [t.RT2])496 # BGPPeerWorker1 subscribes to RT1 and RT2497 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)498 self._worker_subscriptions(bgp_peer_worker1, [t.RT1, t.RT2])499 # BGPPeerWorker1 advertises a route for RT1 and RT2500 evt1 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,501 [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)502 # BGPPeerWorker1 advertises an other route for RT2503 evt2 = self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI2,504 [t.RT2], bgp_peer_worker1, t.NH1)505 # Cleanup Worker1506 self.rtm.enqueue(engine.WorkerCleanupEvent(bgp_peer_worker1))507 # Waiting for RouteTableManager thread finishes to process the508 # subscriptions509 self._wait()510 self.assertEqual(511 0,512 self.rtm.last_local_subscriber_callback.call_count,513 "last_local_subscriber_callback should not have been called "514 " (non local worker)")515 # check unsubscriptions516 self._check_unsubscriptions(bgp_peer_worker1, [MATCH1, MATCH2])517 # Check route synthesize to Worker1 and Worker2518 self.assertEqual(2, worker1.enqueue.call_count,519 "2 routes should be advert/withdraw to Worker1")520 self._check_events_calls(worker1.enqueue.call_args_list,521 [evt1.route_entry], [evt1.route_entry.nlri])522 self.assertEqual(4, worker2.enqueue.call_count,523 "4 routes should be advert/withdraw to Worker2")524 self._check_events_calls(worker2.enqueue.call_args_list,525 [evt1.route_entry, evt2.route_entry],526 [evt1.route_entry.nlri,527 evt2.route_entry.nlri])528 # Check route entries have been removed for BGPPeerWorker1529 self.assertNotIn(evt1.route_entry, bgp_peer_worker1._rtm_route_entries,530 "Route entry found")531 self.assertNotIn(evt2.route_entry, bgp_peer_worker1._rtm_route_entries,532 "Route entry found")533 def test_e1_dump_state(self):534 # BGPPeerWorker1 advertises a route for RT1 and RT2535 bgp_peer_worker1 = self._new_worker("BGPWorker1", bpw.BGPPeerWorker)536 self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI1,537 [t.RT1, t.RT2], bgp_peer_worker1, t.NH1)538 # BGPPeerWorker1 advertises an other route for RT2539 self._new_route_event(engine.RouteEvent.ADVERTISE, t.NLRI2,540 [t.RT2], bgp_peer_worker1, t.NH1)541 # BGPPeerWorker1 subscribes to RT1542 self._worker_subscriptions(bgp_peer_worker1, [t.RT1])543 # Worker1 subscribes to RT1 and RT2544 worker1 = self._new_worker("worker.Worker-1", worker.Worker)545 self._worker_subscriptions(worker1, [t.RT1, t.RT2])546 # Worker2 subscribes to RT1547 worker2 = self._new_worker("worker.Worker-2", worker.Worker)548 self._worker_subscriptions(worker2, [t.RT1])549 # Worker3 subscribes to RT3550 worker3 = self._new_worker("worker.Worker-3", worker.Worker)551 self._worker_subscriptions(worker3, [t.RT3])552 self.rtm._dump_state()553 def test_7_matches(self):554 m1a = rtm.Match(exa.AFI(exa.AFI.ipv4),555 exa.SAFI(exa.SAFI.mpls_vpn),556 exa.RouteTarget(64512, 1))557 m1b = rtm.Match(exa.AFI(exa.AFI.ipv4),558 exa.SAFI(exa.SAFI.mpls_vpn),559 exa.RouteTarget(64512, 1))560 m1c = rtm.Match(exa.AFI(exa.AFI.ipv4),561 exa.SAFI(exa.SAFI.mpls_vpn),562 exa.RouteTarget(64512, 1, False))563 m2 = rtm.Match(exa.AFI(exa.AFI.ipv4),564 exa.SAFI(exa.SAFI.mpls_vpn),565 exa.RouteTarget(64512, 2))566 m3 = rtm.Match(exa.AFI(exa.AFI.ipv4),567 exa.SAFI(exa.SAFI.mpls_vpn),568 exa.RouteTarget(64513, 1))569 self.assertEqual(hash(m1a), hash(m1b))570 self.assertEqual(hash(m1a), hash(m1c))571 self.assertNotEqual(hash(m1a), hash(m2))572 self.assertNotEqual(hash(m1a), hash(m3))573 self.assertEqual(m1a, m1b)574 self.assertEqual(m1a, m1c)575 self.assertNotEqual(m1a, m2)576 self.assertNotEqual(m1a, m3)577 def test_f1_test_empty_rt(self):578 # worker advertises a route with no RT579 w1 = self._new_worker("Worker1", worker.Worker)580 subscribe = engine.Subscription(exa.AFI(exa.AFI.ipv4),581 exa.SAFI(exa.SAFI.mpls_vpn),582 None, w1)583 self.rtm.enqueue(subscribe)584 w2 = self._new_worker("Worker2", worker.Worker)585 route_event = engine.RouteEvent(586 engine.RouteEvent.ADVERTISE,587 engine.RouteEntry(t.NLRI1, None, exa.Attributes()),588 w2)589 self.rtm.enqueue(route_event)590 self._wait()591 self.assertEqual(1, w1.enqueue.call_count,...
threadpool.py
Source:threadpool.py
...75 while True:76 self._lock.acquire()77 if self._available == 0:78 self._lock.release()79 self._new_worker()80 else:81 break82 self._tasks.put((func, args, kwargs))83 if self.available > self._total / 2 and self.total > 8:84 for i in xrange(self._total / 2 - 1):85 self._tasks.put((None,None,None))86 self._lock.release()87 def join (self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!