Best Python code snippet using tempest_python
test_sfc.py
Source:test_sfc.py
...63 self.router2_net1_fixed_ip = self.router2_net1[64 'fixed_ips'][0]['ip_address']65 self.router3_net1_fixed_ip = self.router3_net1[66 'fixed_ips'][0]['ip_address']67 def _setup_server(self, network):68 server = self._create_server(network=self.net1)69 floating_ip = self._create_floating_ip(70 server)71 port_id, fixed_ip = (72 self._get_server_port_id_and_ip4(server))73 return floating_ip, port_id, fixed_ip74 def _create_floating_ip(self, server):75 floating_ip = self.create_floating_ip(76 server)77 self.check_floating_ip_status(floating_ip, 'ACTIVE')78 floating_ip_addr = floating_ip['floating_ip_address']79 self.check_public_network_connectivity(80 floating_ip_addr, self.ssh_user,81 self.keypair['private_key'], True,82 servers=[server])83 return floating_ip_addr84 def _wait_for_port_chain_status(self, port_chain, status):85 time.sleep(10)86 def _create_port_chain_helper(self, symmetric):87 (88 server1_floating_ip, server1_port_id, server1_fixed_ip89 ) = self._setup_server(self.net1)90 (91 server2_floating_ip, server2_port_id, server2_fixed_ip92 ) = self._setup_server(self.net1)93 self._check_connectivity(94 server1_floating_ip, server2_fixed_ip,95 [],96 username=self.ssh_user,97 private_key=self.keypair['private_key'])98 if symmetric:99 fc = self._create_flowclassifier(100 logical_source_port=server1_port_id,101 source_ip_prefix='%s/32' % server1_fixed_ip,102 logical_destination_port=server2_port_id,103 destination_ip_prefix='%s/32' % server2_fixed_ip104 )105 else:106 fc = self._create_flowclassifier(107 logical_source_port=server1_port_id,108 source_ip_prefix='%s/32' % server1_fixed_ip,109 destination_ip_prefix='%s/32' % server2_fixed_ip110 )111 port_pair = self._create_port_pair(112 ingress=self.router2_net1['id'],113 egress=self.router2_net1['id']114 )115 port_pair_group = self._create_port_pair_group(116 port_pairs=[port_pair['id']]117 )118 port_chain = self._create_port_chain(119 port_pair_groups=[port_pair_group['id']],120 flow_classifiers=[fc['id']],121 chain_parameters={'symmetric': symmetric}122 )123 self._check_connectivity(124 server1_floating_ip, server2_fixed_ip,125 [[self.router2_net1_fixed_ip]],126 username=self.ssh_user,127 private_key=self.keypair['private_key'])128 self.portchain_client.delete_port_chain(port_chain['id'])129 self._wait_for_port_chain_status(port_chain, 'DELETED')130 self._check_connectivity(131 server1_floating_ip, server2_fixed_ip,132 [],133 username=self.ssh_user,134 private_key=self.keypair['private_key'])135 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a8')136 @test.services('compute', 'network')137 def test_create_port_chain(self):138 self._create_port_chain_helper(False)139 @decorators.idempotent_id('35927961-1904-4a6b-9d08-ad819f1cf812')140 @test.services('compute', 'network')141 def test_create_port_chain_symmetric(self):142 self._create_port_chain_helper(True)143 def _create_port_chain_multi_fc_helper(self, symmetric):144 (145 server1_floating_ip, server1_port_id, server1_fixed_ip146 ) = self._setup_server(self.net1)147 (148 server2_floating_ip, server2_port_id, server2_fixed_ip149 ) = self._setup_server(self.net1)150 self._check_connectivity(151 server1_floating_ip, server2_fixed_ip,152 [],153 username=self.ssh_user,154 private_key=self.keypair['private_key'])155 self._check_connectivity(156 server2_floating_ip, server1_fixed_ip,157 [],158 username=self.ssh_user,159 private_key=self.keypair['private_key'])160 if symmetric:161 fc1 = self._create_flowclassifier(162 logical_source_port=server1_port_id,163 source_ip_prefix='%s/32' % server1_fixed_ip,164 logical_destination_port=server2_port_id,165 destination_ip_prefix='%s/32' % server2_fixed_ip166 )167 fc2 = self._create_flowclassifier(168 logical_source_port=server2_port_id,169 source_ip_prefix='%s/32' % server2_fixed_ip,170 logical_destination_port=server1_port_id,171 destination_ip_prefix='%s/32' % server1_fixed_ip172 )173 else:174 fc1 = self._create_flowclassifier(175 logical_source_port=server1_port_id,176 source_ip_prefix='%s/32' % server1_fixed_ip,177 destination_ip_prefix='%s/32' % server2_fixed_ip178 )179 fc2 = self._create_flowclassifier(180 logical_source_port=server2_port_id,181 source_ip_prefix='%s/32' % server2_fixed_ip,182 destination_ip_prefix='%s/32' % server1_fixed_ip183 )184 port_pair = self._create_port_pair(185 ingress=self.router2_net1['id'],186 egress=self.router2_net1['id']187 )188 port_pair_group = self._create_port_pair_group(189 port_pairs=[port_pair['id']]190 )191 port_chain = self._create_port_chain(192 port_pair_groups=[port_pair_group['id']],193 flow_classifiers=[fc1['id'], fc2['id']],194 chain_parameters={'symmetric': symmetric}195 )196 self._check_connectivity(197 server1_floating_ip, server2_fixed_ip,198 [[self.router2_net1_fixed_ip]],199 username=self.ssh_user,200 private_key=self.keypair['private_key'])201 self._check_connectivity(202 server2_floating_ip, server1_fixed_ip,203 [[self.router2_net1_fixed_ip]],204 username=self.ssh_user,205 private_key=self.keypair['private_key'])206 self.portchain_client.delete_port_chain(port_chain['id'])207 self._wait_for_port_chain_status(port_chain, 'DELETED')208 self._check_connectivity(209 server1_floating_ip, server2_fixed_ip,210 [],211 username=self.ssh_user,212 private_key=self.keypair['private_key'])213 self._check_connectivity(214 server2_floating_ip, server1_fixed_ip,215 [],216 username=self.ssh_user,217 private_key=self.keypair['private_key'])218 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21a9')219 @test.services('compute', 'network')220 def test_create_port_chain_multi_flow_classifiers(self):221 self._create_port_chain_multi_fc_helper(False)222 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21b1')223 @test.services('compute', 'network')224 def test_create_port_chain_multi_flow_classifiers_symmetric(self):225 self._create_port_chain_multi_fc_helper(True)226 def _create_port_chain_multi_port_pairs_helper(self, symmetric):227 (228 server1_floating_ip, server1_port_id, server1_fixed_ip229 ) = self._setup_server(self.net1)230 (231 server2_floating_ip, server2_port_id, server2_fixed_ip232 ) = self._setup_server(self.net1)233 self._check_connectivity(234 server1_floating_ip, server2_fixed_ip,235 [],236 username=self.ssh_user,237 private_key=self.keypair['private_key'])238 if symmetric:239 fc = self._create_flowclassifier(240 logical_source_port=server1_port_id,241 source_ip_prefix='%s/32' % server1_fixed_ip,242 logical_destination_port=server2_port_id,243 destination_ip_prefix='%s/32' % server2_fixed_ip244 )245 else:246 fc = self._create_flowclassifier(247 logical_source_port=server1_port_id,248 source_ip_prefix='%s/32' % server1_fixed_ip,249 destination_ip_prefix='%s/32' % server2_fixed_ip250 )251 port_pair1 = self._create_port_pair(252 ingress=self.router2_net1['id'],253 egress=self.router2_net1['id']254 )255 port_pair2 = self._create_port_pair(256 ingress=self.router3_net1['id'],257 egress=self.router3_net1['id']258 )259 port_pair_group = self._create_port_pair_group(260 port_pairs=[port_pair1['id'], port_pair2['id']]261 )262 port_chain = self._create_port_chain(263 port_pair_groups=[port_pair_group['id']],264 flow_classifiers=[fc['id']],265 chain_parameters={'symmetric': symmetric}266 )267 self._check_connectivity(268 server1_floating_ip, server2_fixed_ip,269 [[self.router2_net1_fixed_ip, self.router3_net1_fixed_ip]],270 username=self.ssh_user,271 private_key=self.keypair['private_key'])272 self.portchain_client.delete_port_chain(port_chain['id'])273 self._wait_for_port_chain_status(port_chain, 'DELETED')274 self._check_connectivity(275 server1_floating_ip, server2_fixed_ip,276 [],277 username=self.ssh_user,278 private_key=self.keypair['private_key'])279 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21aa')280 @test.services('compute', 'network')281 def test_create_port_chain_multi_port_pairs(self):282 self._create_port_chain_multi_port_pairs_helper(False)283 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f869be1e21ad')284 @test.services('compute', 'network')285 def test_create_port_chain_multi_port_pairs_symmetric(self):286 self._create_port_chain_multi_port_pairs_helper(True)287 def _create_port_chain_multi_ppg_helper(self, symmetric):288 (289 server1_floating_ip, server1_port_id, server1_fixed_ip290 ) = self._setup_server(self.net1)291 (292 server2_floating_ip, server2_port_id, server2_fixed_ip293 ) = self._setup_server(self.net1)294 self._check_connectivity(295 server1_floating_ip, server2_fixed_ip,296 [],297 username=self.ssh_user,298 private_key=self.keypair['private_key'])299 if symmetric:300 fc = self._create_flowclassifier(301 logical_source_port=server1_port_id,302 source_ip_prefix="%s/32" % server1_fixed_ip,303 logical_destination_port=server2_port_id,304 destination_ip_prefix="%s/32" % server2_fixed_ip305 )306 else:307 fc = self._create_flowclassifier(308 logical_source_port=server1_port_id,309 source_ip_prefix="%s/32" % server1_fixed_ip,310 destination_ip_prefix="%s/32" % server2_fixed_ip311 )312 port_pair1 = self._create_port_pair(313 ingress=self.router2_net1['id'],314 egress=self.router2_net1['id']315 )316 port_pair2 = self._create_port_pair(317 ingress=self.router3_net1['id'],318 egress=self.router3_net1['id']319 )320 port_pair_group1 = self._create_port_pair_group(321 port_pairs=[port_pair1['id']]322 )323 port_pair_group2 = self._create_port_pair_group(324 port_pairs=[port_pair2['id']]325 )326 port_chain = self._create_port_chain(327 port_pair_groups=[port_pair_group1['id'], port_pair_group2['id']],328 flow_classifiers=[fc['id']],329 chain_parameters={'symmetric': symmetric}330 )331 self._check_connectivity(332 server1_floating_ip, server2_fixed_ip,333 [[self.router2_net1_fixed_ip], [self.router3_net1_fixed_ip]],334 username=self.ssh_user,335 private_key=self.keypair['private_key'])336 self.portchain_client.delete_port_chain(port_chain['id'])337 self._wait_for_port_chain_status(port_chain, 'DELETED')338 self._check_connectivity(339 server1_floating_ip, server2_fixed_ip,340 [],341 username=self.ssh_user,342 private_key=self.keypair['private_key'])343 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21ab')344 @test.services('compute', 'network')345 def test_create_port_chain_multi_port_pair_groups(self):346 self._create_port_chain_multi_ppg_helper(False)347 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21b0')348 @test.services('compute', 'network')349 def test_create_port_chain_multi_port_pair_groups_symmetric(self):350 self._create_port_chain_multi_ppg_helper(True)351 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e22ab')352 @test.services('compute', 'network')353 def test_create_multi_port_chain(self):354 (355 server1_floating_ip, server1_port_id, server1_fixed_ip356 ) = self._setup_server(self.net1)357 (358 server2_floating_ip, server2_port_id, server2_fixed_ip359 ) = self._setup_server(self.net1)360 self._check_connectivity(361 server1_floating_ip, server2_fixed_ip,362 [],363 username=self.ssh_user,364 private_key=self.keypair['private_key'])365 fc1 = self._create_flowclassifier(366 logical_source_port=server1_port_id,367 source_ip_prefix='%s/32' % server1_fixed_ip,368 destination_ip_prefix='%s/32' % server2_fixed_ip369 )370 fc2 = self._create_flowclassifier(371 logical_source_port=server2_port_id,372 source_ip_prefix='%s/32' % server2_fixed_ip,373 destination_ip_prefix='%s/32' % server1_fixed_ip374 )375 port_pair1 = self._create_port_pair(376 ingress=self.router2_net1['id'],377 egress=self.router2_net1['id']378 )379 port_pair2 = self._create_port_pair(380 ingress=self.router3_net1['id'],381 egress=self.router3_net1['id']382 )383 port_pair_group1 = self._create_port_pair_group(384 port_pairs=[port_pair1['id']]385 )386 port_pair_group2 = self._create_port_pair_group(387 port_pairs=[port_pair2['id']]388 )389 port_chain1 = self._create_port_chain(390 port_pair_groups=[port_pair_group1['id'], port_pair_group2['id']],391 flow_classifiers=[fc1['id']]392 )393 port_chain2 = self._create_port_chain(394 port_pair_groups=[port_pair_group2['id'], port_pair_group1['id']],395 flow_classifiers=[fc2['id']]396 )397 self._check_connectivity(398 server1_floating_ip, server2_fixed_ip,399 [[self.router2_net1_fixed_ip], [self.router3_net1_fixed_ip]],400 username=self.ssh_user,401 private_key=self.keypair['private_key'])402 self._check_connectivity(403 server2_floating_ip, server1_fixed_ip,404 [[self.router3_net1_fixed_ip], [self.router2_net1_fixed_ip]],405 username=self.ssh_user,406 private_key=self.keypair['private_key'])407 self.portchain_client.delete_port_chain(port_chain1['id'])408 self.portchain_client.delete_port_chain(port_chain2['id'])409 self._wait_for_port_chain_status(port_chain1, 'DELETED')410 self._wait_for_port_chain_status(port_chain2, 'DELETED')411 self._check_connectivity(412 server1_floating_ip, server2_fixed_ip,413 [],414 username=self.ssh_user,415 private_key=self.keypair['private_key'])416 self._check_connectivity(417 server2_floating_ip, server1_fixed_ip,418 [],419 username=self.ssh_user,420 private_key=self.keypair['private_key'])421 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21ac')422 @test.services('compute', 'network')423 def test_update_port_chain_add_flow_classifiers(self):424 (425 server1_floating_ip, server1_port_id, server1_fixed_ip426 ) = self._setup_server(self.net1)427 (428 server2_floating_ip, server2_port_id, server2_fixed_ip429 ) = self._setup_server(self.net1)430 self._check_connectivity(431 server1_floating_ip, server2_fixed_ip,432 [],433 username=self.ssh_user,434 private_key=self.keypair['private_key'])435 self._check_connectivity(436 server2_floating_ip, server1_fixed_ip,437 [],438 username=self.ssh_user,439 private_key=self.keypair['private_key'])440 fc1 = self._create_flowclassifier(441 logical_source_port=server1_port_id,442 source_ip_prefix='%s/32' % server1_fixed_ip,443 destination_ip_prefix='%s/32' % server2_fixed_ip444 )445 fc2 = self._create_flowclassifier(446 logical_source_port=server2_port_id,447 source_ip_prefix='%s/32' % server2_fixed_ip,448 destination_ip_prefix='%s/32' % server1_fixed_ip449 )450 port_pair = self._create_port_pair(451 ingress=self.router2_net1['id'],452 egress=self.router2_net1['id']453 )454 port_pair_group = self._create_port_pair_group(455 port_pairs=[port_pair['id']]456 )457 port_chain = self._create_port_chain(458 port_pair_groups=[port_pair_group['id']],459 flow_classifiers=[fc1['id']]460 )461 self._check_connectivity(462 server1_floating_ip, server2_fixed_ip,463 [[self.router2_net1_fixed_ip]],464 username=self.ssh_user,465 private_key=self.keypair['private_key'])466 self._check_connectivity(467 server2_floating_ip, server1_fixed_ip,468 [],469 username=self.ssh_user,470 private_key=self.keypair['private_key'])471 self.portchain_client.update_port_chain(472 port_chain['id'], flow_classifiers=[fc1['id'], fc2['id']])473 self._wait_for_port_chain_status(port_chain, 'ACTIVE')474 self._check_connectivity(475 server1_floating_ip, server2_fixed_ip,476 [[self.router2_net1_fixed_ip]],477 username=self.ssh_user,478 private_key=self.keypair['private_key'])479 self._check_connectivity(480 server2_floating_ip, server1_fixed_ip,481 [[self.router2_net1_fixed_ip]],482 username=self.ssh_user,483 private_key=self.keypair['private_key'])484 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21ad')485 @test.services('compute', 'network')486 def test_update_port_chain_remove_flow_classifiers(self):487 (488 server1_floating_ip, server1_port_id, server1_fixed_ip489 ) = self._setup_server(self.net1)490 (491 server2_floating_ip, server2_port_id, server2_fixed_ip492 ) = self._setup_server(self.net1)493 self._check_connectivity(494 server1_floating_ip, server2_fixed_ip,495 [],496 username=self.ssh_user,497 private_key=self.keypair['private_key'])498 self._check_connectivity(499 server2_floating_ip, server1_fixed_ip,500 [],501 username=self.ssh_user,502 private_key=self.keypair['private_key'])503 fc1 = self._create_flowclassifier(504 logical_source_port=server1_port_id,505 source_ip_prefix='%s/32' % server1_fixed_ip,506 destination_ip_prefix='%s/32' % server2_fixed_ip507 )508 fc2 = self._create_flowclassifier(509 logical_source_port=server2_port_id,510 source_ip_prefix='%s/32' % server2_fixed_ip,511 destination_ip_prefix='%s/32' % server1_fixed_ip512 )513 port_pair = self._create_port_pair(514 ingress=self.router2_net1['id'],515 egress=self.router2_net1['id']516 )517 port_pair_group = self._create_port_pair_group(518 port_pairs=[port_pair['id']]519 )520 port_chain = self._create_port_chain(521 port_pair_groups=[port_pair_group['id']],522 flow_classifiers=[fc1['id'], fc2['id']]523 )524 self._check_connectivity(525 server1_floating_ip, server2_fixed_ip,526 [[self.router2_net1_fixed_ip]],527 username=self.ssh_user,528 private_key=self.keypair['private_key'])529 self._check_connectivity(530 server2_floating_ip, server1_fixed_ip,531 [[self.router2_net1_fixed_ip]],532 username=self.ssh_user,533 private_key=self.keypair['private_key'])534 self.portchain_client.update_port_chain(535 port_chain['id'], flow_classifiers=[fc1['id']])536 self._wait_for_port_chain_status(port_chain, 'ACTIVE')537 self._check_connectivity(538 server1_floating_ip, server2_fixed_ip,539 [[self.router2_net1_fixed_ip]],540 username=self.ssh_user,541 private_key=self.keypair['private_key'])542 self._check_connectivity(543 server2_floating_ip, server1_fixed_ip,544 [],545 username=self.ssh_user,546 private_key=self.keypair['private_key'])547 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21ae')548 @test.services('compute', 'network')549 def test_update_port_chain_replace_flow_classifiers(self):550 (551 server1_floating_ip, server1_port_id, server1_fixed_ip552 ) = self._setup_server(self.net1)553 (554 server2_floating_ip, server2_port_id, server2_fixed_ip555 ) = self._setup_server(self.net1)556 self._check_connectivity(557 server1_floating_ip, server2_fixed_ip,558 [],559 username=self.ssh_user,560 private_key=self.keypair['private_key'])561 self._check_connectivity(562 server2_floating_ip, server1_fixed_ip,563 [],564 username=self.ssh_user,565 private_key=self.keypair['private_key'])566 fc1 = self._create_flowclassifier(567 logical_source_port=server1_port_id,568 source_ip_prefix='%s/32' % server1_fixed_ip,569 destination_ip_prefix='%s/32' % server2_fixed_ip570 )571 fc2 = self._create_flowclassifier(572 logical_source_port=server2_port_id,573 source_ip_prefix='%s/32' % server2_fixed_ip,574 destination_ip_prefix='%s/32' % server1_fixed_ip575 )576 port_pair = self._create_port_pair(577 ingress=self.router2_net1['id'],578 egress=self.router2_net1['id']579 )580 port_pair_group = self._create_port_pair_group(581 port_pairs=[port_pair['id']]582 )583 port_chain = self._create_port_chain(584 port_pair_groups=[port_pair_group['id']],585 flow_classifiers=[fc1['id']]586 )587 self._check_connectivity(588 server1_floating_ip, server2_fixed_ip,589 [[self.router2_net1_fixed_ip]],590 username=self.ssh_user,591 private_key=self.keypair['private_key'])592 self._check_connectivity(593 server2_floating_ip, server1_fixed_ip,594 [],595 username=self.ssh_user,596 private_key=self.keypair['private_key'])597 self.portchain_client.update_port_chain(598 port_chain['id'], flow_classifiers=[fc2['id']])599 self._wait_for_port_chain_status(port_chain, 'DELETED')600 self._check_connectivity(601 server1_floating_ip, server2_fixed_ip,602 [],603 username=self.ssh_user,604 private_key=self.keypair['private_key'])605 self._check_connectivity(606 server2_floating_ip, server1_fixed_ip,607 [[self.router2_net1_fixed_ip]],608 username=self.ssh_user,609 private_key=self.keypair['private_key'])610 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21af')611 @test.services('compute', 'network')612 def test_update_port_chain_add_port_pair_groups(self):613 (614 server1_floating_ip, server1_port_id, server1_fixed_ip615 ) = self._setup_server(self.net1)616 (617 server2_floating_ip, server2_port_id, server2_fixed_ip618 ) = self._setup_server(self.net1)619 self._check_connectivity(620 server1_floating_ip, server2_fixed_ip,621 [],622 username=self.ssh_user,623 private_key=self.keypair['private_key'])624 self._check_connectivity(625 server2_floating_ip, server1_fixed_ip,626 [],627 username=self.ssh_user,628 private_key=self.keypair['private_key'])629 fc = self._create_flowclassifier(630 logical_source_port=server1_port_id,631 source_ip_prefix='%s/32' % server1_fixed_ip,632 destination_ip_prefix='%s/32' % server2_fixed_ip633 )634 port_pair1 = self._create_port_pair(635 ingress=self.router2_net1['id'],636 egress=self.router2_net1['id']637 )638 port_pair2 = self._create_port_pair(639 ingress=self.router3_net1['id'],640 egress=self.router3_net1['id']641 )642 port_pair_group1 = self._create_port_pair_group(643 port_pairs=[port_pair1['id']]644 )645 port_pair_group2 = self._create_port_pair_group(646 port_pairs=[port_pair2['id']]647 )648 port_chain = self._create_port_chain(649 port_pair_groups=[port_pair_group1['id']],650 flow_classifiers=[fc['id']]651 )652 self._check_connectivity(653 server1_floating_ip, server2_fixed_ip,654 [[self.router2_net1_fixed_ip]],655 username=self.ssh_user,656 private_key=self.keypair['private_key'])657 self.portchain_client.update_port_chain(658 port_chain['id'],659 port_pair_groups=[660 port_pair_group1['id'], port_pair_group2['id']661 ]662 )663 self._wait_for_port_chain_status(port_chain, 'ACTIVE')664 self._check_connectivity(665 server1_floating_ip, server2_fixed_ip,666 [[self.router2_net1_fixed_ip], [self.router3_net1_fixed_ip]],667 username=self.ssh_user,668 private_key=self.keypair['private_key'])669 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21bf')670 @test.services('compute', 'network')671 def test_update_port_chain_remove_port_pair_groups(self):672 (673 server1_floating_ip, server1_port_id, server1_fixed_ip674 ) = self._setup_server(self.net1)675 (676 server2_floating_ip, server2_port_id, server2_fixed_ip677 ) = self._setup_server(self.net1)678 self._check_connectivity(679 server1_floating_ip, server2_fixed_ip,680 [],681 username=self.ssh_user,682 private_key=self.keypair['private_key'])683 self._check_connectivity(684 server2_floating_ip, server1_fixed_ip,685 [],686 username=self.ssh_user,687 private_key=self.keypair['private_key'])688 fc = self._create_flowclassifier(689 logical_source_port=server1_port_id,690 source_ip_prefix='%s/32' % server1_fixed_ip,691 destination_ip_prefix='%s/32' % server2_fixed_ip692 )693 port_pair1 = self._create_port_pair(694 ingress=self.router2_net1['id'],695 egress=self.router2_net1['id']696 )697 port_pair2 = self._create_port_pair(698 ingress=self.router3_net1['id'],699 egress=self.router3_net1['id']700 )701 port_pair_group1 = self._create_port_pair_group(702 port_pairs=[port_pair1['id']]703 )704 port_pair_group2 = self._create_port_pair_group(705 port_pairs=[port_pair2['id']]706 )707 port_chain = self._create_port_chain(708 port_pair_groups=[709 port_pair_group1['id'], port_pair_group2['id']710 ],711 flow_classifiers=[fc['id']]712 )713 self._check_connectivity(714 server1_floating_ip, server2_fixed_ip,715 [[self.router2_net1_fixed_ip], [self.router3_net1_fixed_ip]],716 username=self.ssh_user,717 private_key=self.keypair['private_key'])718 self.portchain_client.update_port_chain(719 port_chain['id'],720 port_pair_groups=[721 port_pair_group1['id']722 ]723 )724 self._wait_for_port_chain_status(port_chain, 'ACTIVE')725 self._check_connectivity(726 server1_floating_ip, server2_fixed_ip,727 [[self.router2_net1_fixed_ip]],728 username=self.ssh_user,729 private_key=self.keypair['private_key'])730 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21be')731 @test.services('compute', 'network')732 def test_update_port_chain_replace_port_pair_groups(self):733 (734 server1_floating_ip, server1_port_id, server1_fixed_ip735 ) = self._setup_server(self.net1)736 (737 server2_floating_ip, server2_port_id, server2_fixed_ip738 ) = self._setup_server(self.net1)739 self._check_connectivity(740 server1_floating_ip, server2_fixed_ip,741 [],742 username=self.ssh_user,743 private_key=self.keypair['private_key'])744 self._check_connectivity(745 server2_floating_ip, server1_fixed_ip,746 [],747 username=self.ssh_user,748 private_key=self.keypair['private_key'])749 fc = self._create_flowclassifier(750 logical_source_port=server1_port_id,751 source_ip_prefix='%s/32' % server1_fixed_ip,752 destination_ip_prefix='%s/32' % server2_fixed_ip753 )754 port_pair1 = self._create_port_pair(755 ingress=self.router2_net1['id'],756 egress=self.router2_net1['id']757 )758 port_pair2 = self._create_port_pair(759 ingress=self.router3_net1['id'],760 egress=self.router3_net1['id']761 )762 port_pair_group1 = self._create_port_pair_group(763 port_pairs=[port_pair1['id']]764 )765 port_pair_group2 = self._create_port_pair_group(766 port_pairs=[port_pair2['id']]767 )768 port_chain = self._create_port_chain(769 port_pair_groups=[770 port_pair_group1['id']771 ],772 flow_classifiers=[fc['id']]773 )774 self._check_connectivity(775 server1_floating_ip, server2_fixed_ip,776 [[self.router2_net1_fixed_ip]],777 username=self.ssh_user,778 private_key=self.keypair['private_key'])779 self.portchain_client.update_port_chain(780 port_chain['id'],781 port_pair_groups=[782 port_pair_group2['id']783 ]784 )785 self._wait_for_port_chain_status(port_chain, 'ACTIVE')786 self._check_connectivity(787 server1_floating_ip, server2_fixed_ip,788 [[self.router3_net1_fixed_ip]],789 username=self.ssh_user,790 private_key=self.keypair['private_key'])791 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21bc')792 @test.services('compute', 'network')793 def test_update_port_chain_replace_port_pair_groups_flow_classifiers(self):794 (795 server1_floating_ip, server1_port_id, server1_fixed_ip796 ) = self._setup_server(self.net1)797 (798 server2_floating_ip, server2_port_id, server2_fixed_ip799 ) = self._setup_server(self.net1)800 self._check_connectivity(801 server1_floating_ip, server2_fixed_ip,802 [],803 username=self.ssh_user,804 private_key=self.keypair['private_key'])805 self._check_connectivity(806 server2_floating_ip, server1_fixed_ip,807 [],808 username=self.ssh_user,809 private_key=self.keypair['private_key'])810 fc1 = self._create_flowclassifier(811 logical_source_port=server1_port_id,812 source_ip_prefix='%s/32' % server1_fixed_ip,813 destination_ip_prefix='%s/32' % server2_fixed_ip814 )815 fc2 = self._create_flowclassifier(816 logical_source_port=server2_port_id,817 source_ip_prefix='%s/32' % server2_fixed_ip,818 destination_ip_prefix='%s/32' % server1_fixed_ip819 )820 port_pair1 = self._create_port_pair(821 ingress=self.router2_net1['id'],822 egress=self.router2_net1['id']823 )824 port_pair2 = self._create_port_pair(825 ingress=self.router3_net1['id'],826 egress=self.router3_net1['id']827 )828 port_pair_group1 = self._create_port_pair_group(829 port_pairs=[port_pair1['id']]830 )831 port_pair_group2 = self._create_port_pair_group(832 port_pairs=[port_pair2['id']]833 )834 port_chain = self._create_port_chain(835 port_pair_groups=[836 port_pair_group1['id']837 ],838 flow_classifiers=[fc1['id']]839 )840 self._check_connectivity(841 server1_floating_ip, server2_fixed_ip,842 [[self.router2_net1_fixed_ip]],843 username=self.ssh_user,844 private_key=self.keypair['private_key'])845 self._check_connectivity(846 server2_floating_ip, server1_fixed_ip,847 [],848 username=self.ssh_user,849 private_key=self.keypair['private_key'])850 self.portchain_client.update_port_chain(851 port_chain['id'],852 port_pair_groups=[port_pair_group2['id']],853 flow_classifiers=[fc2['id']])854 self._wait_for_port_chain_status(port_chain, 'ACTIVE')855 self._check_connectivity(856 server1_floating_ip, server2_fixed_ip,857 [],858 username=self.ssh_user,859 private_key=self.keypair['private_key'])860 self._check_connectivity(861 server2_floating_ip, server1_fixed_ip,862 [[self.router3_net1_fixed_ip]],863 username=self.ssh_user,864 private_key=self.keypair['private_key'])865 def _wait_for_port_pair_group_status(self, port_pair_group, status):866 time.sleep(10)867 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21bb')868 @test.services('compute', 'network')869 def test_update_port_pair_group_add_port_pairs(self):870 (871 server1_floating_ip, server1_port_id, server1_fixed_ip872 ) = self._setup_server(self.net1)873 (874 server2_floating_ip, server2_port_id, server2_fixed_ip875 ) = self._setup_server(self.net1)876 self._check_connectivity(877 server1_floating_ip, server2_fixed_ip,878 [],879 username=self.ssh_user,880 private_key=self.keypair['private_key'])881 fc = self._create_flowclassifier(882 logical_source_port=server1_port_id,883 source_ip_prefix='%s/32' % server1_fixed_ip,884 destination_ip_prefix='%s/32' % server2_fixed_ip885 )886 port_pair1 = self._create_port_pair(887 ingress=self.router2_net1['id'],888 egress=self.router2_net1['id']889 )890 port_pair2 = self._create_port_pair(891 ingress=self.router3_net1['id'],892 egress=self.router3_net1['id']893 )894 port_pair_group = self._create_port_pair_group(895 port_pairs=[port_pair1['id']]896 )897 self._create_port_chain(898 port_pair_groups=[899 port_pair_group['id']900 ],901 flow_classifiers=[fc['id']]902 )903 self._check_connectivity(904 server1_floating_ip, server2_fixed_ip,905 [[self.router2_net1_fixed_ip]],906 username=self.ssh_user,907 private_key=self.keypair['private_key'])908 self.portpairgroup_client.update_port_pair_group(909 port_pair_group['id'],910 port_pairs=[911 port_pair1['id'], port_pair2['id']912 ]913 )914 self._wait_for_port_pair_group_status(port_pair_group, 'ACTIVE')915 self._check_connectivity(916 server1_floating_ip, server2_fixed_ip,917 [[self.router2_net1_fixed_ip, self.router3_net1_fixed_ip]],918 username=self.ssh_user,919 private_key=self.keypair['private_key'])920 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21ba')921 @test.services('compute', 'network')922 def test_update_port_pair_group_remove_port_pairs(self):923 (924 server1_floating_ip, server1_port_id, server1_fixed_ip925 ) = self._setup_server(self.net1)926 (927 server2_floating_ip, server2_port_id, server2_fixed_ip928 ) = self._setup_server(self.net1)929 self._check_connectivity(930 server1_floating_ip, server2_fixed_ip,931 [],932 username=self.ssh_user,933 private_key=self.keypair['private_key'])934 fc = self._create_flowclassifier(935 logical_source_port=server1_port_id,936 source_ip_prefix='%s/32' % server1_fixed_ip,937 destination_ip_prefix='%s/32' % server2_fixed_ip938 )939 port_pair1 = self._create_port_pair(940 ingress=self.router2_net1['id'],941 egress=self.router2_net1['id']942 )943 port_pair2 = self._create_port_pair(944 ingress=self.router3_net1['id'],945 egress=self.router3_net1['id']946 )947 port_pair_group = self._create_port_pair_group(948 port_pairs=[port_pair1['id'], port_pair2['id']]949 )950 self._create_port_chain(951 port_pair_groups=[952 port_pair_group['id']953 ],954 flow_classifiers=[fc['id']]955 )956 self._check_connectivity(957 server1_floating_ip, server2_fixed_ip,958 [[self.router2_net1_fixed_ip, self.router3_net1_fixed_ip]],959 username=self.ssh_user,960 private_key=self.keypair['private_key'])961 self.portpairgroup_client.update_port_pair_group(962 port_pair_group['id'],963 port_pairs=[port_pair1['id']])964 self._wait_for_port_pair_group_status(port_pair_group, 'ACTIVE')965 self._check_connectivity(966 server1_floating_ip, server2_fixed_ip,967 [[self.router2_net1_fixed_ip]],968 username=self.ssh_user,969 private_key=self.keypair['private_key'])970 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21b9')971 @test.services('compute', 'network')972 def test_update_port_pair_group_replace_port_pairs(self):973 (974 server1_floating_ip, server1_port_id, server1_fixed_ip975 ) = self._setup_server(self.net1)976 (977 server2_floating_ip, server2_port_id, server2_fixed_ip978 ) = self._setup_server(self.net1)979 self._check_connectivity(980 server1_floating_ip, server2_fixed_ip,981 [],982 username=self.ssh_user,983 private_key=self.keypair['private_key'])984 fc = self._create_flowclassifier(985 logical_source_port=server1_port_id,986 source_ip_prefix='%s/32' % server1_fixed_ip,987 destination_ip_prefix='%s/32' % server2_fixed_ip988 )989 port_pair1 = self._create_port_pair(990 ingress=self.router2_net1['id'],991 egress=self.router2_net1['id']992 )993 port_pair2 = self._create_port_pair(994 ingress=self.router3_net1['id'],995 egress=self.router3_net1['id']996 )997 port_pair_group = self._create_port_pair_group(998 port_pairs=[port_pair1['id']]999 )1000 self._create_port_chain(1001 port_pair_groups=[1002 port_pair_group['id']1003 ],1004 flow_classifiers=[fc['id']]1005 )1006 self._check_connectivity(1007 server1_floating_ip, server2_fixed_ip,1008 [[self.router2_net1_fixed_ip]],1009 username=self.ssh_user,1010 private_key=self.keypair['private_key'])1011 self.portpairgroup_client.update_port_pair_group(1012 port_pair_group['id'],1013 port_pairs=[port_pair2['id']])1014 self._wait_for_port_pair_group_status(port_pair_group, 'ACTIVE')1015 self._check_connectivity(1016 server1_floating_ip, server2_fixed_ip,1017 [[self.router3_net1_fixed_ip]],1018 username=self.ssh_user,1019 private_key=self.keypair['private_key'])1020 @decorators.idempotent_id('f970f6b3-6541-47ac-a9ea-f769be1e21b9')1021 @test.services('compute', 'network')1022 def test_multi_port_chains_update_port_pair_group_replace_port_pairs(1023 self1024 ):1025 self.router4 = self._create_router()1026 self.router4_net1 = self._create_port(self.net1['id'])1027 self._add_router_interface(1028 self.router4['id'], self.router4_net1['id'])1029 self.router4_net1_fixed_ip = self.router4_net1[1030 'fixed_ips'][0]['ip_address']1031 self.router5 = self._create_router()1032 self.router5_net1 = self._create_port(self.net1['id'])1033 self._add_router_interface(1034 self.router5['id'], self.router5_net1['id'])1035 self.router5_net1_fixed_ip = self.router5_net1[1036 'fixed_ips'][0]['ip_address']1037 (1038 server1_floating_ip, server1_port_id, server1_fixed_ip1039 ) = self._setup_server(self.net1)1040 (1041 server2_floating_ip, server2_port_id, server2_fixed_ip1042 ) = self._setup_server(self.net1)1043 self._check_connectivity(1044 server1_floating_ip, server2_fixed_ip,1045 [],1046 username=self.ssh_user,1047 private_key=self.keypair['private_key'])1048 fc1 = self._create_flowclassifier(1049 logical_source_port=server1_port_id,1050 source_ip_prefix='%s/32' % server1_fixed_ip,1051 destination_ip_prefix='%s/32' % server2_fixed_ip1052 )1053 fc2 = self._create_flowclassifier(1054 logical_source_port=server2_port_id,1055 source_ip_prefix='%s/32' % server2_fixed_ip,1056 destination_ip_prefix='%s/32' % server1_fixed_ip...
test_network_advanced_server_ops.py
Source:test_network_advanced_server_ops.py
...46 def setup_credentials(cls):47 # Create no network resources for these tests.48 cls.set_network_resources()49 super(TestNetworkAdvancedServerOps, cls).setup_credentials()50 def _setup_server(self, keypair):51 security_groups = []52 if utils.is_extension_enabled('security-group', 'network'):53 security_group = self._create_security_group()54 security_groups = [{'name': security_group['name']}]55 network, _, _ = self.create_networks()56 server = self.create_server(57 networks=[{'uuid': network['id']}],58 key_name=keypair['name'],59 security_groups=security_groups)60 return server61 def _setup_network(self, server, keypair):62 public_network_id = CONF.network.public_network_id63 floating_ip = self.create_floating_ip(server, public_network_id)64 # Verify that we can indeed connect to the server before we mess with65 # it's state66 self._wait_server_status_and_check_network_connectivity(67 server, keypair, floating_ip)68 return floating_ip69 def _check_network_connectivity(self, server, keypair, floating_ip,70 should_connect=True):71 username = CONF.validation.image_ssh_user72 private_key = keypair['private_key']73 self._check_tenant_network_connectivity(74 server, username, private_key,75 should_connect=should_connect,76 servers_for_debug=[server])77 floating_ip_addr = floating_ip['floating_ip_address']78 # Check FloatingIP status before checking the connectivity79 self.check_floating_ip_status(floating_ip, 'ACTIVE')80 self.check_public_network_connectivity(floating_ip_addr, username,81 private_key, should_connect,82 servers=[server])83 def _wait_server_status_and_check_network_connectivity(self, server,84 keypair,85 floating_ip):86 waiters.wait_for_server_status(self.servers_client, server['id'],87 'ACTIVE')88 self._check_network_connectivity(server, keypair, floating_ip)89 def _get_host_for_server(self, server_id):90 body = self.admin_servers_client.show_server(server_id)['server']91 return body['OS-EXT-SRV-ATTR:host']92 @decorators.idempotent_id('61f1aa9a-1573-410e-9054-afa557cab021')93 @decorators.attr(type='slow')94 @utils.services('compute', 'network')95 def test_server_connectivity_stop_start(self):96 keypair = self.create_keypair()97 server = self._setup_server(keypair)98 floating_ip = self._setup_network(server, keypair)99 self.servers_client.stop_server(server['id'])100 waiters.wait_for_server_status(self.servers_client, server['id'],101 'SHUTOFF')102 self._check_network_connectivity(server, keypair, floating_ip,103 should_connect=False)104 self.servers_client.start_server(server['id'])105 self._wait_server_status_and_check_network_connectivity(106 server, keypair, floating_ip)107 @decorators.idempotent_id('7b6860c2-afa3-4846-9522-adeb38dfbe08')108 @utils.services('compute', 'network')109 def test_server_connectivity_reboot(self):110 keypair = self.create_keypair()111 server = self._setup_server(keypair)112 floating_ip = self._setup_network(server, keypair)113 self.servers_client.reboot_server(server['id'], type='SOFT')114 self._wait_server_status_and_check_network_connectivity(115 server, keypair, floating_ip)116 @decorators.idempotent_id('88a529c2-1daa-4c85-9aec-d541ba3eb699')117 @decorators.attr(type='slow')118 @utils.services('compute', 'network')119 def test_server_connectivity_rebuild(self):120 keypair = self.create_keypair()121 server = self._setup_server(keypair)122 floating_ip = self._setup_network(server, keypair)123 image_ref_alt = CONF.compute.image_ref_alt124 self.servers_client.rebuild_server(server['id'],125 image_ref=image_ref_alt)126 self._wait_server_status_and_check_network_connectivity(127 server, keypair, floating_ip)128 @decorators.idempotent_id('2b2642db-6568-4b35-b812-eceed3fa20ce')129 @testtools.skipUnless(CONF.compute_feature_enabled.pause,130 'Pause is not available.')131 @decorators.attr(type='slow')132 @utils.services('compute', 'network')133 def test_server_connectivity_pause_unpause(self):134 keypair = self.create_keypair()135 server = self._setup_server(keypair)136 floating_ip = self._setup_network(server, keypair)137 self.servers_client.pause_server(server['id'])138 waiters.wait_for_server_status(self.servers_client, server['id'],139 'PAUSED')140 self._check_network_connectivity(server, keypair, floating_ip,141 should_connect=False)142 self.servers_client.unpause_server(server['id'])143 self._wait_server_status_and_check_network_connectivity(144 server, keypair, floating_ip)145 @decorators.idempotent_id('5cdf9499-541d-4923-804e-b9a60620a7f0')146 @testtools.skipUnless(CONF.compute_feature_enabled.suspend,147 'Suspend is not available.')148 @decorators.attr(type='slow')149 @utils.services('compute', 'network')150 def test_server_connectivity_suspend_resume(self):151 keypair = self.create_keypair()152 server = self._setup_server(keypair)153 floating_ip = self._setup_network(server, keypair)154 self.servers_client.suspend_server(server['id'])155 waiters.wait_for_server_status(self.servers_client, server['id'],156 'SUSPENDED')157 self._check_network_connectivity(server, keypair, floating_ip,158 should_connect=False)159 self.servers_client.resume_server(server['id'])160 self._wait_server_status_and_check_network_connectivity(161 server, keypair, floating_ip)162 @decorators.idempotent_id('719eb59d-2f42-4b66-b8b1-bb1254473967')163 @testtools.skipUnless(CONF.compute_feature_enabled.resize,164 'Resize is not available.')165 @decorators.attr(type='slow')166 @utils.services('compute', 'network')167 def test_server_connectivity_resize(self):168 resize_flavor = CONF.compute.flavor_ref_alt169 keypair = self.create_keypair()170 server = self._setup_server(keypair)171 floating_ip = self._setup_network(server, keypair)172 self.servers_client.resize_server(server['id'],173 flavor_ref=resize_flavor)174 waiters.wait_for_server_status(self.servers_client, server['id'],175 'VERIFY_RESIZE')176 self.servers_client.confirm_resize_server(server['id'])177 self._wait_server_status_and_check_network_connectivity(178 server, keypair, floating_ip)179 @decorators.idempotent_id('a4858f6c-401e-4155-9a49-d5cd053d1a2f')180 @testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,181 'Cold migration is not available.')182 @testtools.skipUnless(CONF.compute.min_compute_nodes > 1,183 'Less than 2 compute nodes, skipping multinode '184 'tests.')185 @decorators.attr(type='slow')186 @utils.services('compute', 'network')187 def test_server_connectivity_cold_migration(self):188 keypair = self.create_keypair()189 server = self._setup_server(keypair)190 floating_ip = self._setup_network(server, keypair)191 src_host = self._get_host_for_server(server['id'])192 self._wait_server_status_and_check_network_connectivity(193 server, keypair, floating_ip)194 self.admin_servers_client.migrate_server(server['id'])195 waiters.wait_for_server_status(self.servers_client, server['id'],196 'VERIFY_RESIZE')197 self.servers_client.confirm_resize_server(server['id'])198 self._wait_server_status_and_check_network_connectivity(199 server, keypair, floating_ip)200 dst_host = self._get_host_for_server(server['id'])201 self.assertNotEqual(src_host, dst_host)202 @decorators.idempotent_id('25b188d7-0183-4b1e-a11d-15840c8e2fd6')203 @testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,204 'Cold migration is not available.')205 @testtools.skipUnless(CONF.compute.min_compute_nodes > 1,206 'Less than 2 compute nodes, skipping multinode '207 'tests.')208 @decorators.attr(type='slow')209 @utils.services('compute', 'network')210 def test_server_connectivity_cold_migration_revert(self):211 keypair = self.create_keypair()212 server = self._setup_server(keypair)213 floating_ip = self._setup_network(server, keypair)214 src_host = self._get_host_for_server(server['id'])215 self._wait_server_status_and_check_network_connectivity(216 server, keypair, floating_ip)217 self.admin_servers_client.migrate_server(server['id'])218 waiters.wait_for_server_status(self.servers_client, server['id'],219 'VERIFY_RESIZE')220 self.servers_client.revert_resize_server(server['id'])221 self._wait_server_status_and_check_network_connectivity(222 server, keypair, floating_ip)223 dst_host = self._get_host_for_server(server['id'])...
email.py
Source:email.py
...23 self._message.attach(MIMEText(text))24 def send(self, list_of_emails):25 joined = email.Utils.COMMASPACE.join(list_of_emails)26 self._message['To'] = joined27 self._setup_server()28 self._server.sendmail(self._user, list_of_emails, self._message.as_string())29 self._server.quit()30 def _setup_server(self):31 self._server = smtplib.SMTP()32 self._server.set_debuglevel(1)33 def _setup_message(self):34 self._message = email.MIMEMultipart.MIMEMultipart()35 self._message['From'] = self._user36 self._message['Subject'] = ''37 self._message['To'] = list()38class LocalHost(Email, object):39 def __init__(self, user):40 super(LocalHost, self).__init__(user=user)41 def _setup_server(self):42 self._server = smtplib.SMTP('localhost')43class Gmail(Email, object):44 def _setup_server(self):45 super(Gmail, self)._setup_server()46 smtp_host = 'smtp.gmail.com'47 smtp_port = 58748 self._server.connect(smtp_host, smtp_port)49 self._server.ehlo()50 self._server.starttls()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!