Best Python code snippet using tempest_python
test_neutron_driver.py
Source:test_neutron_driver.py
1# Copyright 2013 OpenStack Foundation2# All Rights Reserved3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15#16import mock17from neutronclient.common import exceptions as n_exc18from neutronclient.neutron import v2_0 as neutronv2019from neutronclient.v2_0 import client20from six.moves import range21from nova import context22from nova import exception23from nova.network.security_group import neutron_driver24from nova import objects25from nova import test26from nova.tests import uuidsentinel as uuids27class TestNeutronDriver(test.NoDBTestCase):28 def setUp(self):29 super(TestNeutronDriver, self).setUp()30 self.context = context.RequestContext('userid', 'my_tenantid')31 setattr(self.context,32 'auth_token',33 'bff4a5a6b9eb4ea2a6efec6eefb77936')34 self.mocked_client = mock.Mock(spec=client.Client)35 self.stub_out('nova.network.neutronv2.api.get_client',36 lambda context: self.mocked_client)37 def test_list_with_project(self):38 project_id = '0af70a4d22cf4652824ddc1f2435dd85'39 security_groups_list = {'security_groups': []}40 self.mocked_client.list_security_groups.return_value = (41 security_groups_list)42 sg_api = neutron_driver.SecurityGroupAPI()43 sg_api.list(self.context, project=project_id)44 self.mocked_client.list_security_groups.assert_called_once_with(45 tenant_id = project_id)46 def test_list_with_all_tenants_and_admin_context(self):47 project_id = '0af70a4d22cf4652824ddc1f2435dd85'48 search_opts = {'all_tenants': 1}49 security_groups_list = {'security_groups': []}50 admin_context = context.RequestContext('user1', project_id, True)51 with mock.patch.object(52 self.mocked_client,53 'list_security_groups',54 return_value=security_groups_list) as mock_list_secgroup:55 sg_api = neutron_driver.SecurityGroupAPI()56 sg_api.list(admin_context,57 project=project_id,58 search_opts=search_opts)59 mock_list_secgroup.assert_called_once_with()60 def test_list_without_all_tenants_and_admin_context(self):61 project_id = '0af70a4d22cf4652824ddc1f2435dd85'62 security_groups_list = {'security_groups': []}63 admin_context = context.RequestContext('user1', project_id, True)64 with mock.patch.object(65 self.mocked_client,66 'list_security_groups',67 return_value=security_groups_list) as mock_list_secgroup:68 sg_api = neutron_driver.SecurityGroupAPI()69 sg_api.list(admin_context, project=project_id)70 mock_list_secgroup.assert_called_once_with(tenant_id=project_id)71 def test_list_with_all_tenants_sec_name_and_admin_context(self):72 project_id = '0af70a4d22cf4652824ddc1f2435dd85'73 search_opts = {'all_tenants': 1}74 security_group_names = ['secgroup_ssh']75 security_groups_list = {'security_groups': []}76 admin_context = context.RequestContext('user1', project_id, True)77 with mock.patch.object(78 self.mocked_client,79 'list_security_groups',80 return_value=security_groups_list) as mock_list_secgroup:81 sg_api = neutron_driver.SecurityGroupAPI()82 sg_api.list(admin_context, project=project_id,83 names=security_group_names,84 search_opts=search_opts)85 mock_list_secgroup.assert_called_once_with(86 name=security_group_names,87 tenant_id=project_id)88 def test_list_with_all_tenants_sec_name_ids_and_admin_context(self):89 project_id = '0af70a4d22cf4652824ddc1f2435dd85'90 search_opts = {'all_tenants': 1}91 security_group_names = ['secgroup_ssh']92 security_group_ids = ['id1']93 security_groups_list = {'security_groups': []}94 admin_context = context.RequestContext('user1', project_id, True)95 with mock.patch.object(96 self.mocked_client,97 'list_security_groups',98 return_value=security_groups_list) as mock_list_secgroup:99 sg_api = neutron_driver.SecurityGroupAPI()100 sg_api.list(admin_context, project=project_id,101 names=security_group_names,102 ids=security_group_ids,103 search_opts=search_opts)104 mock_list_secgroup.assert_called_once_with(105 name=security_group_names,106 id=security_group_ids,107 tenant_id=project_id)108 def test_list_with_all_tenants_not_admin(self):109 search_opts = {'all_tenants': 1}110 security_groups_list = {'security_groups': []}111 with mock.patch.object(112 self.mocked_client,113 'list_security_groups',114 return_value=security_groups_list) as mock_list_secgroup:115 sg_api = neutron_driver.SecurityGroupAPI()116 sg_api.list(self.context, project=self.context.tenant,117 search_opts=search_opts)118 mock_list_secgroup.assert_called_once_with(119 tenant_id=self.context.tenant)120 def test_get_with_name_duplicated(self):121 sg_name = 'web_server'122 expected_sg_id = '85cc3048-abc3-43cc-89b3-377341426ac5'123 expected_sg = {'security_group': {'name': sg_name,124 'id': expected_sg_id,125 'tenant_id': self.context.tenant,126 'description': 'server', 'rules': []}}127 self.mocked_client.show_security_group.return_value = expected_sg128 sg_api = neutron_driver.SecurityGroupAPI()129 with mock.patch.object(neutronv20, 'find_resourceid_by_name_or_id',130 return_value=expected_sg_id):131 observed_sg = sg_api.get(self.context, name=sg_name)132 expected_sg['security_group']['project_id'] = self.context.tenant133 del expected_sg['security_group']['tenant_id']134 self.assertEqual(expected_sg['security_group'], observed_sg)135 self.mocked_client.show_security_group.assert_called_once_with(136 expected_sg_id)137 def test_get_with_invalid_name(self):138 sg_name = 'invalid_name'139 expected_sg_id = '85cc3048-abc3-43cc-89b3-377341426ac5'140 self.mocked_client.show_security_group.side_effect = TypeError141 with mock.patch.object(neutronv20, 'find_resourceid_by_name_or_id',142 return_value=expected_sg_id):143 sg_api = neutron_driver.SecurityGroupAPI()144 self.assertRaises(exception.SecurityGroupNotFound,145 sg_api.get, self.context, name=sg_name)146 self.mocked_client.show_security_group.assert_called_once_with(147 expected_sg_id)148 def test_create_security_group_with_bad_request(self):149 name = 'test-security-group'150 description = None151 body = {'security_group': {'name': name,152 'description': description}}153 message = "Invalid input. Reason: 'None' is not a valid string."154 self.mocked_client.create_security_group.side_effect = (155 n_exc.BadRequest(message=message))156 sg_api = neutron_driver.SecurityGroupAPI()157 self.assertRaises(exception.Invalid,158 sg_api.create_security_group, self.context, name,159 description)160 self.mocked_client.create_security_group.assert_called_once_with(body)161 def test_create_security_group_exceed_quota(self):162 name = 'test-security-group'163 description = 'test-security-group'164 body = {'security_group': {'name': name,165 'description': description}}166 message = "Quota exceeded for resources: ['security_group']"167 self.mocked_client.create_security_group.side_effect = (168 n_exc.NeutronClientException(status_code=409, message=message))169 sg_api = neutron_driver.SecurityGroupAPI()170 self.assertRaises(exception.SecurityGroupLimitExceeded,171 sg_api.create_security_group, self.context, name,172 description)173 self.mocked_client.create_security_group.assert_called_once_with(body)174 def test_create_security_group_rules_exceed_quota(self):175 vals = {'protocol': 'tcp', 'cidr': '0.0.0.0/0',176 'parent_group_id': '7ae75663-277e-4a0e-8f87-56ea4e70cb47',177 'group_id': None, 'from_port': 1025, 'to_port': 1025}178 body = {'security_group_rules': [{'remote_group_id': None,179 'direction': 'ingress', 'protocol': 'tcp', 'ethertype': 'IPv4',180 'port_range_max': 1025, 'port_range_min': 1025,181 'security_group_id': '7ae75663-277e-4a0e-8f87-56ea4e70cb47',182 'remote_ip_prefix': '0.0.0.0/0'}]}183 name = 'test-security-group'184 message = "Quota exceeded for resources: ['security_group_rule']"185 self.mocked_client.create_security_group_rule.side_effect = (186 n_exc.NeutronClientException(status_code=409, message=message))187 sg_api = neutron_driver.SecurityGroupAPI()188 self.assertRaises(exception.SecurityGroupLimitExceeded,189 sg_api.add_rules, self.context, None, name, [vals])190 self.mocked_client.create_security_group_rule.assert_called_once_with(191 body)192 def test_create_security_group_rules_bad_request(self):193 vals = {'protocol': 'icmp', 'cidr': '0.0.0.0/0',194 'parent_group_id': '7ae75663-277e-4a0e-8f87-56ea4e70cb47',195 'group_id': None, 'to_port': 255}196 body = {'security_group_rules': [{'remote_group_id': None,197 'direction': 'ingress', 'protocol': 'icmp',198 'ethertype': 'IPv4', 'port_range_max': 255,199 'security_group_id': '7ae75663-277e-4a0e-8f87-56ea4e70cb47',200 'remote_ip_prefix': '0.0.0.0/0'}]}201 name = 'test-security-group'202 message = "ICMP code (port-range-max) 255 is provided but ICMP type" \203 " (port-range-min) is missing"204 self.mocked_client.create_security_group_rule.side_effect = (205 n_exc.NeutronClientException(status_code=400, message=message))206 sg_api = neutron_driver.SecurityGroupAPI()207 self.assertRaises(exception.Invalid, sg_api.add_rules,208 self.context, None, name, [vals])209 self.mocked_client.create_security_group_rule.assert_called_once_with(210 body)211 def test_list_security_group_with_no_port_range_and_not_tcp_udp_icmp(self):212 sg1 = {'description': 'default',213 'id': '07f1362f-34f6-4136-819a-2dcde112269e',214 'name': 'default',215 'tenant_id': 'c166d9316f814891bcb66b96c4c891d6',216 'security_group_rules':217 [{'direction': 'ingress',218 'ethertype': 'IPv4',219 'id': '0a4647f1-e1aa-488d-90e1-97a7d0293beb',220 'port_range_max': None,221 'port_range_min': None,222 'protocol': '51',223 'remote_group_id': None,224 'remote_ip_prefix': None,225 'security_group_id':226 '07f1362f-34f6-4136-819a-2dcde112269e',227 'tenant_id': 'c166d9316f814891bcb66b96c4c891d6'}]}228 self.mocked_client.list_security_groups.return_value = (229 {'security_groups': [sg1]})230 sg_api = neutron_driver.SecurityGroupAPI()231 result = sg_api.list(self.context)232 expected = [{'rules':233 [{'from_port': -1, 'protocol': '51', 'to_port': -1,234 'parent_group_id': '07f1362f-34f6-4136-819a-2dcde112269e',235 'cidr': '0.0.0.0/0', 'group_id': None,236 'id': '0a4647f1-e1aa-488d-90e1-97a7d0293beb'}],237 'project_id': 'c166d9316f814891bcb66b96c4c891d6',238 'id': '07f1362f-34f6-4136-819a-2dcde112269e',239 'name': 'default', 'description': 'default'}]240 self.assertEqual(expected, result)241 self.mocked_client.list_security_groups.assert_called_once_with()242 def test_instances_security_group_bindings(self):243 server_id = 'c5a20e8d-c4b0-47cf-9dca-ebe4f758acb1'244 port1_id = '4c505aec-09aa-47bc-bcc0-940477e84dc0'245 port2_id = 'b3b31a53-6e29-479f-ae5c-00b7b71a6d44'246 sg1_id = '2f7ce969-1a73-4ef9-bbd6-c9a91780ecd4'247 sg2_id = '20c89ce5-9388-4046-896e-64ffbd3eb584'248 servers = [{'id': server_id}]249 ports = [{'id': port1_id, 'device_id': server_id,250 'security_groups': [sg1_id]},251 {'id': port2_id, 'device_id': server_id,252 'security_groups': [sg2_id]}]253 port_list = {'ports': ports}254 sg1 = {'id': sg1_id, 'name': 'wol'}255 sg2 = {'id': sg2_id, 'name': 'eor'}256 security_groups_list = {'security_groups': [sg1, sg2]}257 sg_bindings = {server_id: [{'name': 'wol'}, {'name': 'eor'}]}258 self.mocked_client.list_ports.return_value = port_list259 self.mocked_client.list_security_groups.return_value = (260 security_groups_list)261 sg_api = neutron_driver.SecurityGroupAPI()262 result = sg_api.get_instances_security_groups_bindings(263 self.context, servers)264 self.assertEqual(sg_bindings, result)265 self.mocked_client.list_ports.assert_called_once_with(266 device_id=[server_id])267 self.mocked_client.list_security_groups.assert_called_once_with(268 id=mock.ANY)269 self.assertEqual(sorted([sg1_id, sg2_id]),270 sorted(self.mocked_client.list_security_groups.call_args[1]['id']))271 def _test_instances_security_group_bindings_scale(self, num_servers):272 max_query = 150273 sg1_id = '2f7ce969-1a73-4ef9-bbd6-c9a91780ecd4'274 sg2_id = '20c89ce5-9388-4046-896e-64ffbd3eb584'275 sg1 = {'id': sg1_id, 'name': 'wol'}276 sg2 = {'id': sg2_id, 'name': 'eor'}277 security_groups_list = {'security_groups': [sg1, sg2]}278 servers = []279 device_ids = []280 ports = []281 sg_bindings = {}282 for i in range(0, num_servers):283 server_id = "server-%d" % i284 port_id = "port-%d" % i285 servers.append({'id': server_id})286 device_ids.append(server_id)287 ports.append({'id': port_id,288 'device_id': server_id,289 'security_groups': [sg1_id, sg2_id]})290 sg_bindings[server_id] = [{'name': 'wol'}, {'name': 'eor'}]291 expected_args = []292 return_values = []293 for x in range(0, num_servers, max_query):294 expected_args.append(295 mock.call(device_id=device_ids[x:x + max_query]))296 return_values.append({'ports': ports[x:x + max_query]})297 self.mocked_client.list_security_groups.return_value = (298 security_groups_list)299 self.mocked_client.list_ports.side_effect = return_values300 sg_api = neutron_driver.SecurityGroupAPI()301 result = sg_api.get_instances_security_groups_bindings(302 self.context, servers)303 self.assertEqual(sg_bindings, result)304 self.mocked_client.list_security_groups.assert_called_once_with(305 id=mock.ANY)306 self.assertEqual(sorted([sg1_id, sg2_id]),307 sorted(self.mocked_client.list_security_groups.call_args[1]['id']))308 self.assertEqual(expected_args,309 self.mocked_client.list_ports.call_args_list)310 def test_instances_security_group_bindings_less_than_max(self):311 self._test_instances_security_group_bindings_scale(100)312 def test_instances_security_group_bindings_max(self):313 self._test_instances_security_group_bindings_scale(150)314 def test_instances_security_group_bindings_more_then_max(self):315 self._test_instances_security_group_bindings_scale(300)316 def test_instances_security_group_bindings_with_hidden_sg(self):317 servers = [{'id': 'server_1'}]318 ports = [{'id': '1', 'device_id': 'dev_1', 'security_groups': ['1']},319 {'id': '2', 'device_id': 'dev_1', 'security_groups': ['2']}]320 port_list = {'ports': ports}321 sg1 = {'id': '1', 'name': 'wol'}322 # User doesn't have access to sg2323 security_groups_list = {'security_groups': [sg1]}324 sg_bindings = {'dev_1': [{'name': 'wol'}]}325 self.mocked_client.list_ports.return_value = port_list326 self.mocked_client.list_security_groups.return_value = (327 security_groups_list)328 sg_api = neutron_driver.SecurityGroupAPI()329 result = sg_api.get_instances_security_groups_bindings(330 self.context, servers)331 self.assertEqual(sg_bindings, result)332 self.mocked_client.list_ports.assert_called_once_with(333 device_id=['server_1'])334 self.mocked_client.list_security_groups.assert_called_once_with(335 id=mock.ANY)336 self.assertEqual(['1', '2'],337 sorted(self.mocked_client.list_security_groups.call_args[1]['id']))338 def test_instance_empty_security_groups(self):339 port_list = {'ports': [{'id': 1, 'device_id': uuids.instance,340 'security_groups': []}]}341 self.mocked_client.list_ports.return_value = port_list342 sg_api = neutron_driver.SecurityGroupAPI()343 result = sg_api.get_instance_security_groups(344 self.context, objects.Instance(uuid=uuids.instance))345 self.assertEqual([], result)346 self.mocked_client.list_ports.assert_called_once_with(347 device_id=[uuids.instance])348class TestNeutronDriverWithoutMock(test.NoDBTestCase):349 def test_validate_property(self):350 sg_api = neutron_driver.SecurityGroupAPI()351 sg_api.validate_property('foo', 'name', None)352 sg_api.validate_property('', 'name', None)353 self.assertRaises(exception.Invalid, sg_api.validate_property,354 'a' * 256, 'name', None)355 self.assertRaises(exception.Invalid, sg_api.validate_property,356 None, 'name', None)357 def test_populate_security_groups(self):358 sg_api = neutron_driver.SecurityGroupAPI()359 r = sg_api.populate_security_groups('ignore')360 self.assertIsInstance(r, objects.SecurityGroupList)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!