Best Python code snippet using lisa_python
test_network.py
Source:test_network.py
1# Copyright 2014: Intel Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import ddt16import mock17from rally import exceptions as rally_exceptions18from rally.plugins.openstack.scenarios.neutron import network19from tests.unit import test20BASE = "rally.plugins.openstack.scenarios.neutron.network"21@ddt.ddt22class NeutronNetworksTestCase(test.ScenarioTestCase):23 @ddt.data(24 {"network_create_args": {}},25 {"network_create_args": {"name": "given-name"}},26 {"network_create_args": {"provider:network_type": "vxlan"}}27 )28 @ddt.unpack29 @mock.patch("%s.CreateAndListNetworks._list_networks" % BASE)30 @mock.patch("%s.CreateAndListNetworks._create_network" % BASE)31 def test_create_and_list_networks(self,32 mock__create_network,33 mock__list_networks,34 network_create_args):35 scenario = network.CreateAndListNetworks(self.context)36 scenario.run(network_create_args=network_create_args)37 mock__create_network.assert_called_once_with(network_create_args)38 mock__list_networks.assert_called_once_with()39 mock__create_network.reset_mock()40 mock__list_networks.reset_mock()41 @ddt.data(42 {"network_create_args": {}},43 {"network_create_args": {"name": "given-name"}},44 )45 @ddt.unpack46 @mock.patch("%s.CreateAndShowNetwork._show_network" % BASE)47 @mock.patch("%s.CreateAndShowNetwork._create_network" % BASE)48 def test_create_and_show_network(self,49 mock__create_network,50 mock__show_network,51 network_create_args):52 scenario = network.CreateAndShowNetwork(self.context)53 mock_net = mock.Mock()54 mock__create_network.return_value = mock_net55 scenario.run(network_create_args=network_create_args)56 mock__create_network.assert_called_once_with(network_create_args)57 mock__show_network.assert_called_once_with(mock_net)58 mock__create_network.reset_mock()59 mock__show_network.reset_mock()60 @mock.patch("%s.CreateAndUpdateNetworks._update_network" % BASE)61 @mock.patch("%s.CreateAndUpdateNetworks._create_network" % BASE,62 return_value={63 "network": {64 "id": "network-id",65 "name": "network-name",66 "admin_state_up": False67 }68 })69 def test_create_and_update_networks(self,70 mock__create_network,71 mock__update_network):72 scenario = network.CreateAndUpdateNetworks(self.context)73 network_update_args = {"name": "_updated", "admin_state_up": True}74 # Default options75 scenario.run(network_update_args=network_update_args)76 mock__create_network.assert_called_once_with({})77 mock__update_network.assert_has_calls(78 [mock.call(79 mock__create_network.return_value, network_update_args80 )])81 mock__create_network.reset_mock()82 mock__update_network.reset_mock()83 # Explicit network name is specified84 network_create_args = {85 "name": "network-name",86 "admin_state_up": False87 }88 scenario.run(network_create_args=network_create_args,89 network_update_args=network_update_args)90 mock__create_network.assert_called_once_with(network_create_args)91 mock__update_network.assert_has_calls(92 [mock.call(mock__create_network.return_value,93 network_update_args)])94 @mock.patch("%s.CreateAndDeleteNetworks._delete_network" % BASE)95 @mock.patch("%s.CreateAndDeleteNetworks._create_network" % BASE)96 def test_create_and_delete_networks(self,97 mock__create_network,98 mock__delete_network):99 scenario = network.CreateAndDeleteNetworks(self.context)100 # Default options101 network_create_args = {}102 scenario.run()103 mock__create_network.assert_called_once_with(network_create_args)104 self.assertTrue(mock__delete_network.call_count)105 mock__create_network.reset_mock()106 mock__delete_network.reset_mock()107 # Explicit network name is specified108 network_create_args = {"name": "given-name"}109 scenario.run(network_create_args=network_create_args)110 mock__create_network.assert_called_once_with(network_create_args)111 self.assertTrue(mock__delete_network.call_count)112 def test_create_and_list_subnets(self):113 network_create_args = {"router:external": True}114 subnet_create_args = {"allocation_pools": []}115 subnet_cidr_start = "default_cidr"116 subnets_per_network = 5117 net = mock.MagicMock()118 scenario = network.CreateAndListSubnets(self.context)119 scenario._create_network = mock.Mock(return_value=net)120 scenario._create_subnets = mock.Mock()121 scenario._list_subnets = mock.Mock()122 scenario.run(network_create_args=network_create_args,123 subnet_create_args=subnet_create_args,124 subnet_cidr_start=subnet_cidr_start,125 subnets_per_network=subnets_per_network)126 scenario._create_network.assert_called_once_with(127 network_create_args)128 scenario._create_subnets.assert_called_once_with(129 net, subnet_create_args, subnet_cidr_start, subnets_per_network)130 scenario._list_subnets.assert_called_once_with()131 def test_create_and_show_subnets(self):132 network_create_args = {"router:external": True}133 subnet_create_args = {"allocation_pools": []}134 subnet_cidr_start = "1.1.0.0/30"135 subnets_per_network = 5136 net = mock.MagicMock()137 scenario = network.CreateAndShowSubnets(self.context)138 scenario._get_or_create_network = mock.Mock(return_value=net)139 scenario._create_subnets = mock.MagicMock()140 scenario._show_subnet = mock.Mock()141 scenario.run(network_create_args=network_create_args,142 subnet_create_args=subnet_create_args,143 subnet_cidr_start=subnet_cidr_start,144 subnets_per_network=subnets_per_network)145 scenario._get_or_create_network.assert_called_once_with(146 network_create_args)147 scenario._create_subnets.assert_called_once_with(148 net, subnet_create_args, subnet_cidr_start, subnets_per_network)149 for subnet in scenario._create_subnets.return_value:150 scenario._show_subnet.assert_called_with(subnet)151 def test_set_and_clear_router_gateway(self):152 network_create_args = {"router:external": True}153 router_create_args = {"admin_state_up": True}154 enable_snat = True155 ext_net = mock.MagicMock()156 router = mock.MagicMock()157 scenario = network.SetAndClearRouterGateway(self.context)158 scenario._create_network = mock.Mock(return_value=ext_net)159 scenario._create_router = mock.Mock(return_value=router)160 scenario._add_gateway_router = mock.Mock()161 scenario._remove_gateway_router = mock.Mock()162 scenario.run(enable_snat, network_create_args, router_create_args)163 scenario._create_network.assert_called_once_with(164 network_create_args)165 scenario._create_router.assert_called_once_with(router_create_args)166 scenario._add_gateway_router.assert_called_once_with(router, ext_net,167 enable_snat)168 scenario._remove_gateway_router.assert_called_once_with(router)169 def test_create_and_update_subnets(self):170 network_create_args = {"router:external": True}171 subnet_create_args = {"allocation_pools": []}172 subnet_update_args = {"enabled_dhcp": True}173 subnet_cidr_start = "default_cidr"174 subnets_per_network = 5175 net = mock.MagicMock()176 subnets = [mock.MagicMock() for _ in range(subnets_per_network)]177 scenario = network.CreateAndUpdateSubnets(self.context)178 scenario._create_network = mock.Mock(return_value=net)179 scenario._create_subnets = mock.Mock(return_value=subnets)180 scenario._update_subnet = mock.Mock()181 scenario.run(subnet_update_args,182 network_create_args=network_create_args,183 subnet_create_args=subnet_create_args,184 subnet_cidr_start=subnet_cidr_start,185 subnets_per_network=subnets_per_network)186 scenario._create_network.assert_called_once_with(187 network_create_args)188 scenario._create_subnets.assert_called_once_with(189 net, subnet_create_args, subnet_cidr_start, subnets_per_network)190 scenario._update_subnet.assert_has_calls(191 [mock.call(s, subnet_update_args) for s in subnets])192 def test_create_and_delete_subnets(self):193 network_create_args = {"router:external": True}194 subnet_create_args = {"allocation_pools": []}195 subnet_cidr_start = "default_cidr"196 subnets_per_network = 5197 net = mock.MagicMock()198 subnets = [mock.MagicMock() for _ in range(subnets_per_network)]199 scenario = network.CreateAndDeleteSubnets(self.context)200 scenario._get_or_create_network = mock.Mock(return_value=net)201 scenario._create_subnets = mock.Mock(return_value=subnets)202 scenario._delete_subnet = mock.Mock()203 scenario.run(network_create_args=network_create_args,204 subnet_create_args=subnet_create_args,205 subnet_cidr_start=subnet_cidr_start,206 subnets_per_network=subnets_per_network)207 scenario._get_or_create_network.assert_called_once_with(208 network_create_args)209 scenario._create_subnets.assert_called_once_with(210 net, subnet_create_args, subnet_cidr_start, subnets_per_network)211 scenario._delete_subnet.assert_has_calls(212 [mock.call(s) for s in subnets])213 def test_create_and_list_routers(self):214 network_create_args = {"router:external": True}215 subnet_create_args = {"allocation_pools": []}216 subnet_cidr_start = "default_cidr"217 subnets_per_network = 5218 router_create_args = {"admin_state_up": True}219 scenario = network.CreateAndListRouters(self.context)220 scenario._create_network_structure = mock.Mock()221 scenario._list_routers = mock.Mock()222 scenario.run(network_create_args=network_create_args,223 subnet_create_args=subnet_create_args,224 subnet_cidr_start=subnet_cidr_start,225 subnets_per_network=subnets_per_network,226 router_create_args=router_create_args)227 scenario._create_network_structure.assert_called_once_with(228 network_create_args, subnet_create_args, subnet_cidr_start,229 subnets_per_network, router_create_args)230 scenario._list_routers.assert_called_once_with()231 def test_list_agents(self):232 agent_args = {233 "F": "id",234 "sort-dir": "asc"235 }236 scenario = network.ListAgents(self.context)237 scenario._list_agents = mock.Mock()238 scenario.run(agent_args=agent_args)239 scenario._list_agents.assert_called_once_with(**agent_args)240 def test_create_and_update_routers(self):241 router_update_args = {"admin_state_up": False}242 network_create_args = {"router:external": True}243 subnet_create_args = {"allocation_pools": []}244 subnet_cidr_start = "default_cidr"245 subnets_per_network = 5246 router_create_args = {"admin_state_up": True}247 net = mock.MagicMock()248 subnets = [mock.MagicMock() for i in range(subnets_per_network)]249 routers = [mock.MagicMock() for i in range(subnets_per_network)]250 scenario = network.CreateAndUpdateRouters(self.context)251 scenario._create_network_structure = mock.Mock(252 return_value=(net, subnets, routers))253 scenario._update_router = mock.Mock()254 scenario.run(router_update_args,255 network_create_args=network_create_args,256 subnet_create_args=subnet_create_args,257 subnet_cidr_start=subnet_cidr_start,258 subnets_per_network=subnets_per_network,259 router_create_args=router_create_args)260 scenario._create_network_structure.assert_called_once_with(261 network_create_args, subnet_create_args, subnet_cidr_start,262 subnets_per_network, router_create_args)263 update_calls = [mock.call(router, router_update_args)264 for router in routers]265 scenario._update_router.assert_has_calls(update_calls)266 def test_create_and_delete_routers(self):267 network_create_args = {"router:external": True}268 subnet_create_args = {"allocation_pools": []}269 subnet_cidr_start = "default_cidr"270 subnets_per_network = 5271 router_create_args = {"admin_state_up": True}272 net = mock.MagicMock()273 subnets = [mock.MagicMock() for i in range(subnets_per_network)]274 routers = [mock.MagicMock() for i in range(subnets_per_network)]275 scenario = network.CreateAndDeleteRouters(self.context)276 scenario._create_network_structure = mock.Mock(277 return_value=(net, subnets, routers))278 scenario._remove_interface_router = mock.Mock()279 scenario._delete_router = mock.Mock()280 scenario.run(network_create_args=network_create_args,281 subnet_create_args=subnet_create_args,282 subnet_cidr_start=subnet_cidr_start,283 subnets_per_network=subnets_per_network,284 router_create_args=router_create_args)285 scenario._create_network_structure.assert_called_once_with(286 network_create_args, subnet_create_args, subnet_cidr_start,287 subnets_per_network, router_create_args)288 scenario._remove_interface_router.assert_has_calls([289 mock.call(subnets[i]["subnet"], routers[i]["router"])290 for i in range(subnets_per_network)])291 scenario._delete_router.assert_has_calls(292 [mock.call(router) for router in routers])293 def test_create_and_show_routers(self):294 network_create_args = {"router:external": True}295 subnet_create_args = {"allocation_pools": []}296 subnet_cidr_start = "default_cidr"297 subnets_per_network = 5298 router_create_args = {"admin_state_up": True}299 net = mock.MagicMock()300 subnets = [mock.MagicMock() for i in range(subnets_per_network)]301 routers = [mock.MagicMock() for i in range(subnets_per_network)]302 scenario = network.CreateAndShowRouters(self.context)303 scenario._create_network_structure = mock.Mock(304 return_value=(net, subnets, routers))305 scenario._show_router = mock.Mock()306 scenario.run(network_create_args=network_create_args,307 subnet_create_args=subnet_create_args,308 subnet_cidr_start=subnet_cidr_start,309 subnets_per_network=subnets_per_network,310 router_create_args=router_create_args)311 scenario._create_network_structure.assert_called_once_with(312 network_create_args, subnet_create_args, subnet_cidr_start,313 subnets_per_network, router_create_args)314 scenario._show_router.assert_has_calls(315 [mock.call(router) for router in routers])316 def test_create_and_list_ports(self):317 port_create_args = {"allocation_pools": []}318 ports_per_network = 10319 network_create_args = {"router:external": True}320 net = mock.MagicMock()321 scenario = network.CreateAndListPorts(self.context)322 scenario._get_or_create_network = mock.Mock(return_value=net)323 scenario._create_port = mock.MagicMock()324 scenario._list_ports = mock.Mock()325 scenario.run(network_create_args=network_create_args,326 port_create_args=port_create_args,327 ports_per_network=ports_per_network)328 scenario._get_or_create_network.assert_called_once_with(329 network_create_args)330 scenario._create_port.assert_has_calls(331 [mock.call(net, port_create_args)332 for _ in range(ports_per_network)])333 scenario._list_ports.assert_called_once_with()334 def test_create_and_update_ports(self):335 port_update_args = {"admin_state_up": False},336 port_create_args = {"allocation_pools": []}337 ports_per_network = 10338 network_create_args = {"router:external": True}339 net = mock.MagicMock()340 ports = [mock.MagicMock() for _ in range(ports_per_network)]341 scenario = network.CreateAndUpdatePorts(self.context)342 scenario._get_or_create_network = mock.Mock(return_value=net)343 scenario._create_port = mock.Mock(side_effect=ports)344 scenario._update_port = mock.Mock()345 scenario.run(port_update_args,346 network_create_args=network_create_args,347 port_create_args=port_create_args,348 ports_per_network=ports_per_network)349 scenario._get_or_create_network.assert_called_once_with(350 network_create_args)351 scenario._create_port.assert_has_calls(352 [mock.call(net, port_create_args)353 for _ in range(ports_per_network)])354 scenario._update_port.assert_has_calls(355 [mock.call(p, port_update_args) for p in ports])356 def test_create_and_show_ports_positive(self):357 port_create_args = {"allocation_pools": []}358 ports_per_network = 1359 network_create_args = {"router:external": True}360 net = mock.MagicMock()361 scenario = network.CreateAndShowPorts(self.context)362 scenario._get_or_create_network = mock.MagicMock(return_value=net)363 scenario._create_port = mock.MagicMock()364 scenario._show_port = mock.MagicMock()365 port = {"port": {"id": 1, "name": "f"}}366 port_info = {"port": {"id": 1, "name": "f", "status": "ACTIVE"}}367 scenario._show_port.return_value = port_info368 # Positive case:369 scenario._create_port.return_value = port370 scenario.run(network_create_args=network_create_args,371 port_create_args=port_create_args,372 ports_per_network=ports_per_network)373 scenario._get_or_create_network.assert_called_once_with(374 network_create_args)375 scenario._create_port.assert_called_with(net, port_create_args)376 scenario._show_port.assert_called_with(port)377 def test_create_and_show_ports_negative(self):378 port_create_args = {"allocation_pools": []}379 ports_per_network = 1380 network_create_args = {"router:external": True}381 net = mock.MagicMock()382 scenario = network.CreateAndShowPorts(self.context)383 scenario._get_or_create_network = mock.MagicMock(return_value=net)384 scenario._create_port = mock.MagicMock()385 scenario._show_port = mock.MagicMock()386 # Negative case1: port isn't created387 scenario._create_port.return_value = None388 self.assertRaises(rally_exceptions.RallyAssertionError,389 scenario.run,390 network_create_args,391 port_create_args,392 ports_per_network)393 scenario._get_or_create_network.assert_called_once_with(394 network_create_args)395 scenario._create_port.assert_called_once_with(net, port_create_args)396 # Negative case2: port isn't show397 port = {"port": {"id": 1, "name": "f1"}}398 port_info = {"port": {"id": 2, "name": "f2", "status": "ACTIVE"}}399 scenario._show_port.return_value = port_info400 scenario._create_port.return_value = port401 self.assertRaises(rally_exceptions.RallyAssertionError,402 scenario.run,403 network_create_args,404 port_create_args,405 ports_per_network)406 scenario._get_or_create_network.assert_called_with(407 network_create_args)408 scenario._create_port.assert_called_with(net, port_create_args)409 scenario._show_port.assert_called_with(port)410 def test_create_and_delete_ports(self):411 port_create_args = {"allocation_pools": []}412 ports_per_network = 10413 network_create_args = {"router:external": True}414 net = mock.MagicMock()415 ports = [mock.MagicMock() for _ in range(ports_per_network)]416 scenario = network.CreateAndDeletePorts(self.context)417 scenario._get_or_create_network = mock.Mock(return_value=net)418 scenario._create_port = mock.Mock(side_effect=ports)419 scenario._delete_port = mock.Mock()420 scenario.run(network_create_args=network_create_args,421 port_create_args=port_create_args,422 ports_per_network=ports_per_network)423 scenario._get_or_create_network.assert_called_once_with(424 network_create_args)425 scenario._create_port.assert_has_calls(426 [mock.call(net, port_create_args)427 for _ in range(ports_per_network)])428 scenario._delete_port.assert_has_calls(429 [mock.call(p) for p in ports])430 @ddt.data(431 {"floating_network": "ext-net"},432 {"floating_network": "ext-net",433 "floating_ip_args": {"floating_ip_address": "1.1.1.1"}},434 )435 @ddt.unpack436 def test_create_and_list_floating_ips(self, floating_network=None,437 floating_ip_args=None):438 scenario = network.CreateAndListFloatingIps(self.context)439 floating_ip_args = floating_ip_args or {}440 scenario._create_floatingip = mock.Mock()441 scenario._list_floating_ips = mock.Mock()442 scenario.run(floating_network=floating_network,443 floating_ip_args=floating_ip_args)444 scenario._create_floatingip.assert_called_once_with(445 floating_network, **floating_ip_args)446 scenario._list_floating_ips.assert_called_once_with()447 @ddt.data(448 {"floating_network": "ext-net"},449 {"floating_network": "ext-net",450 "floating_ip_args": {"floating_ip_address": "1.1.1.1"}},451 )452 @ddt.unpack453 def test_create_and_delete_floating_ips(self, floating_network=None,454 floating_ip_args=None):455 scenario = network.CreateAndDeleteFloatingIps(self.context)456 floating_ip_args = floating_ip_args or {}457 fip = {"floatingip": {"id": "floating-ip-id"}}458 scenario._create_floatingip = mock.Mock(return_value=fip)459 scenario._delete_floating_ip = mock.Mock()460 scenario.run(floating_network=floating_network,461 floating_ip_args=floating_ip_args)462 scenario._create_floatingip.assert_called_once_with(463 floating_network, **floating_ip_args)464 scenario._delete_floating_ip.assert_called_once_with(...
network_plugin.py
Source:network_plugin.py
...53 :param subnet_cidr_start: str, start value for subnets CIDR54 :param subnets_per_network: int, number of subnets for one network55 """56 network = self._get_or_create_network(network_create_args)57 self._create_subnets(network, subnet_create_args, subnet_cidr_start,58 subnets_per_network)59@validation.add("number", param_name="subnets_per_network",60 minval=1, integer_only=True)61@validation.add("required_services", services=[consts.Service.NEUTRON])62@validation.add("required_platform", platform="openstack", users=True)63@scenario.configure(context={"cleanup@openstack": ["neutron"]},64 name="NetworkPlugin.create_subnets_routers_interfaces")65class CreateSubnetsRoutersInterfaces(utils.NeutronScenario):66 def run(self,67 network_create_args=None,68 subnet_create_args=None,69 subnet_cidr_start=None,70 subnets_per_network=None,71 router_create_args=None):72 """Create a network, a given number of subnets and routers.73 :param network_create_args: dict, POST /v2.0/networks request74 options. Deprecated.75 :param subnet_create_args: dict, POST /v2.0/subnets request options76 :param subnet_cidr_start: str, start value for subnets CIDR77 :param subnets_per_network: int, number of subnets for one network78 :param router_create_args: dict, POST /v2.0/routers request options79 """80 self._create_network_structure(network_create_args, subnet_create_args,81 subnet_cidr_start, subnets_per_network,82 router_create_args)83@validation.add("number", param_name="routers_per_subnet",84 minval=1, integer_only=True)85@validation.add("required_services", services=[consts.Service.NEUTRON])86@validation.add("required_platform", platform="openstack", users=True)87@scenario.configure(context={"cleanup@openstack": ["neutron"]},88 name="NetworkPlugin.create_routers")89class CreateRouters(utils.NeutronScenario):90 def run(self,91 network_create_args=None,92 routers_per_subnet=None,93 router_create_args=None,94 port_create_args=None):95 """Create a given number of ports and routers.96 :param network_create_args: dict, POST /v2.0/networks request97 options. Deprecated.98 :param routers_per_subnet: int, number of routers for one subnet99 :param router_create_args: dict, POST /v2.0/routers request options100 :param port_create_args: dict, POST /v2.0/ports request options101 """102 network = self._get_or_create_network(network_create_args)103 for i in range(routers_per_subnet):104 router = self._create_router(router_create_args or {})105 port = self._create_port(network, port_create_args or {})106 self._add_interface_router_port(router, port)107 @atomic.action_timer("neutron.add_interface_router_port")108 def _add_interface_router_port(self, router, port):109 self.clients("neutron").add_interface_router(110 router["router"]["id"], {"port_id": port["port"]["id"]})111@validation.add("number", param_name="ports_per_network",112 minval=1, integer_only=True)113@validation.add("required_services", services=[consts.Service.NEUTRON])114@validation.add("required_platform", platform="openstack", users=True)115@scenario.configure(context={"cleanup@openstack": ["neutron"]},116 name="NetworkPlugin.create_ports")117class CreatePorts(utils.NeutronScenario):118 def run(self,119 network_create_args=None,120 port_create_args=None,121 ports_per_network=None):122 """Create a given number of ports.123 :param network_create_args: dict, POST /v2.0/networks request124 options. Deprecated.125 :param port_create_args: dict, POST /v2.0/ports request options126 :param ports_per_network: int, number of ports for one network127 """128 network = self._get_or_create_network(network_create_args)129 for i in range(ports_per_network):130 self._create_port(network, port_create_args or {})131@validation.add("required_services",132 services=[consts.Service.NEUTRON])133@validation.add("required_platform", platform="openstack", users=True)134@scenario.configure(context={"cleanup@openstack": ["neutron"]},135 name="NetworkPlugin.create_and_list_dualstack_subnets",136 platform="openstack")137class CreateAndListDualStackSubnets(utils.NeutronScenario):138 SUBNET_IP_VERSION = 4139 SUBNET_IP_VERSION_v4 = 4140 SUBNET_IP_VERSION_v6 = 6141 @atomic.action_timer("neutron.create_subnet")142 def _create_subnet(self, network, subnet_create_args, start_cidr=None,143 ip_version=4):144 """Create neutron subnet.145 :param network: neutron network dict146 :param subnet_create_args: POST /v2.0/subnets request options147 :returns: neutron subnet dict148 """149 network_id = network["network"]["id"]150 if not subnet_create_args.get("cidr"):151 if ip_version == 4:152 start_cidr = start_cidr or "10.2.0.0/24"153 subnet_create_args["cidr"] = (154 network_wrapper.generate_cidr(start_cidr=start_cidr))155 subnet_create_args.setdefault("ip_version",156 self.SUBNET_IP_VERSION_v4)157 elif ip_version == 6:158 start_cidr = (start_cidr or159 "1504:40db:0000:0000:0000:0000:0000:0001/64")160 subnet_create_args["cidr"] = (161 network_wrapper.generate_cidr(start_cidr=start_cidr))162 subnet_create_args.setdefault("ip_version",163 self.SUBNET_IP_VERSION_v6)164 subnet_create_args["network_id"] = network_id165 subnet_create_args["name"] = self.generate_random_name()166 return self.clients("neutron").create_subnet(167 {"subnet": subnet_create_args})168 def _create_subnets(self, network,169 subnet_create_args=None,170 subnet_cidr_start=None,171 subnets_per_network=1,172 ip_version=4):173 """Create <count> new subnets in the given network.174 :param network: network to create subnets in175 :param subnet_create_args: dict, POST /v2.0/subnets request options176 :param subnet_cidr_start: str, start value for subnets CIDR177 :param subnets_per_network: int, number of subnets for one network178 :returns: List of subnet dicts179 """180 return [self._create_subnet(network, subnet_create_args or {},181 subnet_cidr_start, ip_version)182 for i in range(subnets_per_network)]183 def run(self, network_create_args=None, subnet_create_args=None,184 subnetv4_cidr_start=None, subnetv6_cidr_start=None,185 subnets_per_network=1):186 """Create and a given number of subnets and list all subnets.187 The scenario creates a network, a given number of subnets and then188 lists subnets.189 :param network_create_args: dict, POST /v2.0/networks request190 options. Deprecated191 :param subnet_create_args: dict, POST /v2.0/subnets request options192 :param subnet_cidr_start: str, start value for subnets CIDR193 :param subnets_per_network: int, number of subnets for one network194 """195 network = self._create_network(network_create_args or {})196 ip_version = 6197 self._create_subnets(network, subnet_create_args, subnetv6_cidr_start,198 subnets_per_network, ip_version)199 ip_version = 4200 self._create_subnets(network, subnet_create_args, subnetv4_cidr_start,201 subnets_per_network, ip_version)202 self._list_subnets()203@validation.add("required_services",204 services=[consts.Service.NEUTRON])205@validation.add("required_platform", platform="openstack", users=True)206@scenario.configure(context={"cleanup@openstack": ["neutron"]},207 name="NetworkPlugin.create_and_delete_dualstack_subnets",208 platform="openstack")209class CreateAndDeleteDualStackSubnets(utils.NeutronScenario):210 SUBNET_IP_VERSION = 4211 SUBNET_IP_VERSION_v4 = 4212 SUBNET_IP_VERSION_v6 = 6213 @atomic.action_timer("neutron.create_subnet")214 def _create_subnet(self, network, subnet_create_args, start_cidr=None,215 ip_version=4):216 """Create neutron subnet.217 :param network: neutron network dict218 :param subnet_create_args: POST /v2.0/subnets request options219 :returns: neutron subnet dict220 """221 network_id = network["network"]["id"]222 if not subnet_create_args.get("cidr"):223 if ip_version == 4:224 start_cidr = start_cidr or "10.2.0.0/24"225 subnet_create_args["cidr"] = (226 network_wrapper.generate_cidr(start_cidr=start_cidr))227 subnet_create_args.setdefault("ip_version",228 self.SUBNET_IP_VERSION_v4)229 elif ip_version == 6:230 start_cidr = (start_cidr or231 "1504:40db:0000:0000:0000:0000:0000:0001/64")232 subnet_create_args["cidr"] = (233 network_wrapper.generate_cidr(start_cidr=start_cidr))234 subnet_create_args.setdefault("ip_version",235 self.SUBNET_IP_VERSION_v6)236 subnet_create_args["network_id"] = network_id237 subnet_create_args["name"] = self.generate_random_name()238 return self.clients("neutron").create_subnet(239 {"subnet": subnet_create_args})240 def _create_subnets(self, network,241 subnet_create_args=None,242 subnet_cidr_start=None,243 subnets_per_network=1,244 ip_version=4):245 """Create <count> new subnets in the given network.246 :param network: network to create subnets in247 :param subnet_create_args: dict, POST /v2.0/subnets request options248 :param subnet_cidr_start: str, start value for subnets CIDR249 :param subnets_per_network: int, number of subnets for one network250 :returns: List of subnet dicts251 """252 return [self._create_subnet(network, subnet_create_args or {},253 subnet_cidr_start, ip_version)254 for i in range(subnets_per_network)]255 def run(self, network_create_args=None, subnet_create_args=None,256 subnetv4_cidr_start=None, subnetv6_cidr_start=None,257 subnets_per_network=1):258 """Create and a given number of subnets and list all subnets.259 The scenario creates a network, a given number of subnets and then260 lists subnets.261 :param network_create_args: dict, POST /v2.0/networks request262 options. Deprecated263 :param subnet_create_args: dict, POST /v2.0/subnets request options264 :param subnet_cidr_start: str, start value for subnets CIDR265 :param subnets_per_network: int, number of subnets for one network266 """267 network = self._create_network(network_create_args or {})268 ip_version = 6269 subnetsv4 = self._create_subnets(network, subnet_create_args,270 subnetv6_cidr_start,271 subnets_per_network,272 ip_version)273 ip_version = 4274 subnetsv6 = self._create_subnets(network, subnet_create_args,275 subnetv4_cidr_start,276 subnets_per_network,277 ip_version)278 for subnet in subnetsv4:279 self._delete_subnet(subnet)280 for subnet in subnetsv6:...
network_stack.py
Source:network_stack.py
...15 self.num_azs = int(self.vpc_config.get('zones', 2))16 # number of distinct layers within the network17 # (default = 2)18 layers = int(self.vpc_config.get('layers', 2))19 self.subnet_configurations = self._create_subnets(layers)20 self.vpc = ec2.Vpc(21 self, "VPC",22 max_azs=self.num_azs,23 cidr=self.cidr,24 subnet_configuration=self.subnet_configurations,25 )26 27 # A wrapper to create a list of subnet configurations28 def _create_subnets(self, layers):29 subnets = []30 for layer in range(1,layers+1):31 subnets.append(self._create_subnet(layer))32 return subnets33 34 # Algorithm will create a SubnetConfiguration using the layers35 # Each config will be half the size of the previous in the order;36 # private, public, isolated, isolated...37 def _create_subnet(self, layer):38 mask = int(self.cidr[-2:]) + layer + self.num_azs39 if layer == 1:40 return ec2.SubnetConfiguration(41 name="Private",42 subnet_type=ec2.SubnetType.PRIVATE,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!