Best Python code snippet using tempest_python
test_plugin.py
Source:test_plugin.py
1# Copyright 2019 Ericsson2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from unittest import mock15from keystoneauth1 import exceptions as ks_exc16from neutron_lib.agent import constants as agent_const17from oslo_log import log as logging18from neutron.services.placement_report import plugin19from neutron.tests.unit.plugins.ml2.drivers import mechanism_test20from neutron.tests.unit.plugins.ml2 import test_plugin21LOG = logging.getLogger(__name__)22class PlacementReportPluginTestCases(test_plugin.Ml2PluginV2TestCase):23 _mechanism_drivers = ['logger', 'test_with_agent']24 def setUp(self):25 super(PlacementReportPluginTestCases, self).setUp()26 self.service_plugin = plugin.PlacementReportPlugin()27 def test__get_rp_by_name_found(self):28 with mock.patch.object(29 self.service_plugin._placement_client,30 'list_resource_providers',31 return_value={'resource_providers': ['fake_rp']}):32 rp = self.service_plugin._get_rp_by_name('whatever')33 self.assertEqual('fake_rp', rp)34 def test__get_rp_by_name_not_found(self):35 with mock.patch.object(36 self.service_plugin._placement_client,37 'list_resource_providers',38 return_value={'resource_providers': []}):39 self.assertRaises(40 IndexError, self.service_plugin._get_rp_by_name, 'no_such_rp')41 def test_no_sync_for_rp_name_not_found(self):42 # looking all good43 agent = {44 'agent_type': 'test_mechanism_driver_agent',45 'configurations': {46 'resource_provider_bandwidths': {'some iface': ''},47 },48 'host': 'fake host',49 }50 agent_db = mock.Mock()51 with mock.patch.object(52 self.service_plugin._placement_client,53 'list_resource_providers',54 return_value={'resource_providers': []}), \55 mock.patch.object(56 self.service_plugin._batch_notifier,57 'queue_event') as mock_queue_event:58 self.service_plugin._sync_placement_state(agent, agent_db)59 self.assertFalse(agent_db.resources_synced)60 agent_db.update.assert_called_with()61 mock_queue_event.assert_not_called()62 def test_no_sync_for_placement_gone(self):63 # looking all good64 agent = {65 'agent_type': 'test_mechanism_driver_agent',66 'configurations': {67 'resource_provider_bandwidths': {'some iface': ''}},68 'host': 'fake host',69 }70 agent_db = mock.Mock()71 with mock.patch.object(72 self.service_plugin._placement_client,73 'list_resource_providers',74 side_effect=ks_exc.HttpError), \75 mock.patch.object(76 self.service_plugin._batch_notifier,77 'queue_event') as mock_queue_event:78 self.service_plugin._sync_placement_state(agent, agent_db)79 self.assertFalse(agent_db.resources_synced)80 agent_db.update.assert_called_with()81 mock_queue_event.assert_not_called()82 def test_no_sync_for_unsupported_agent_type(self):83 payload = mock.Mock(84 # looking all good, but agent type not supported85 desired_state={86 'agent_type': 'unsupported agent type',87 'configurations': {'resource_provider_bandwidths': {}},88 'host': 'fake host',89 })90 with mock.patch.object(self.service_plugin._core_plugin,91 '_get_agent_by_type_and_host') as mock_get_agent, \92 mock.patch.object(self.service_plugin,93 '_sync_placement_state') as mock_sync:94 self.service_plugin.handle_placement_config(95 mock.ANY, mock.ANY, mock.ANY, payload)96 mock_get_agent.assert_not_called()97 mock_sync.assert_not_called()98 def test_no_sync_without_resource_info(self):99 payload = mock.Mock(100 # looking all good, but 'configurations' has no101 # 'resource_provider_bandwidths'102 desired_state={103 'agent_type': 'test_mechanism_driver_agent',104 'configurations': {},105 'host': 'fake host',106 })107 with mock.patch.object(self.service_plugin._core_plugin,108 '_get_agent_by_type_and_host') as mock_get_agent, \109 mock.patch.object(self.service_plugin,110 '_sync_placement_state') as mock_sync:111 self.service_plugin.handle_placement_config(112 mock.ANY, mock.ANY, mock.ANY, payload)113 mock_get_agent.assert_not_called()114 mock_sync.assert_not_called()115 def test_sync_if_agent_is_new(self):116 payload = mock.Mock(117 desired_state={118 'agent_type': 'test_mechanism_driver_agent',119 'configurations': {'resource_provider_bandwidths': {}},120 'host': 'fake host',121 },122 metadata={123 'status': agent_const.AGENT_NEW,124 },125 )126 with mock.patch.object(self.service_plugin._core_plugin,127 '_get_agent_by_type_and_host') as mock_get_agent, \128 mock.patch.object(self.service_plugin,129 '_sync_placement_state') as mock_sync:130 self.service_plugin.handle_placement_config(131 mock.ANY, mock.ANY, mock.ANY, payload)132 self.assertEqual(1, mock_get_agent.call_count)133 self.assertEqual(1, mock_sync.call_count)134 def test_sync_if_agent_is_restarted(self):135 payload = mock.Mock(136 desired_state={137 'agent_type': 'test_mechanism_driver_agent',138 'configurations': {'resource_provider_bandwidths': {}},139 'host': 'fake host',140 'start_flag': True,141 },142 )143 with mock.patch.object(self.service_plugin._core_plugin,144 '_get_agent_by_type_and_host') as mock_get_agent, \145 mock.patch.object(self.service_plugin,146 '_sync_placement_state') as mock_sync:147 self.service_plugin.handle_placement_config(148 mock.ANY, mock.ANY, mock.ANY, payload)149 self.assertEqual(1, mock_get_agent.call_count)150 self.assertEqual(1, mock_sync.call_count)151 def test_sync_after_transient_error(self):152 payload = mock.Mock(153 desired_state={154 'agent_type': 'test_mechanism_driver_agent',155 'configurations': {'resource_provider_bandwidths': {}},156 'host': 'fake host',157 },158 )159 with mock.patch.object(self.service_plugin._core_plugin,160 '_get_agent_by_type_and_host',161 return_value={'resources_synced': False}) as mock_get_agent, \162 mock.patch.object(self.service_plugin,163 '_sync_placement_state') as mock_sync:164 self.service_plugin.handle_placement_config(165 mock.ANY, mock.ANY, mock.ANY, payload)166 self.assertEqual(1, mock_get_agent.call_count)167 self.assertEqual(1, mock_sync.call_count)168 def test__sync_placement_state_legacy(self):169 agent = {170 'agent_type': 'test_mechanism_driver_agent',171 'configurations': {172 'resource_provider_bandwidths': {},173 'resource_provider_inventory_defaults': {},174 },175 'host': 'fake host',176 }177 agent_db = mock.Mock()178 with mock.patch.object(self.service_plugin._batch_notifier,179 'queue_event') as mock_queue_event, \180 mock.patch.object(self.service_plugin._placement_client,181 'list_resource_providers',182 return_value={'resource_providers': [{'uuid': 'fake uuid'}]}):183 self.service_plugin._sync_placement_state(agent, agent_db)184 self.assertEqual(1, mock_queue_event.call_count)185 def test__sync_placement_state_rp_hypervisors(self):186 agent = {187 'agent_type': 'test_mechanism_driver_agent',188 'configurations': {189 'resource_provider_bandwidths': {},190 'resource_provider_inventory_defaults': {},191 'resource_provider_hypervisors': {'eth0': 'hypervisor0'},192 },193 'host': 'fake host',194 }195 agent_db = mock.Mock()196 with mock.patch.object(self.service_plugin._batch_notifier,197 'queue_event') as mock_queue_event, \198 mock.patch.object(self.service_plugin._placement_client,199 'list_resource_providers',200 return_value={'resource_providers': [201 {'uuid': 'fake uuid'}]}) as mock_list_rps:202 self.service_plugin._sync_placement_state(agent, agent_db)203 self.assertEqual(1, mock_queue_event.call_count)204 mock_list_rps.assert_called_once_with(name='hypervisor0')205class PlacementReporterAgentsTestCases(test_plugin.Ml2PluginV2TestCase):206 _mechanism_drivers = ['logger', 'test_with_agent']207 def test_supported_agent_types(self):208 self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)209 self.assertEqual(210 ['test_mechanism_driver_agent'],211 self.agents.supported_agent_types)212 def test_mechanism_driver_by_agent_type_found(self):213 self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)214 mech_driver = self.agents.mechanism_driver_by_agent_type(215 'test_mechanism_driver_agent')216 self.assertIsInstance(mech_driver, mechanism_test.TestMechanismDriver)217 def test_mechanism_driver_by_agent_type_not_found(self):218 self.agents = plugin.PlacementReporterAgents(ml2_plugin=self.plugin)219 self.assertRaises(220 Exception, # noqa221 self.agents.mechanism_driver_by_agent_type,...
test_resource_provider.py
Source:test_resource_provider.py
...48 self.assertRaises(exception.ResourceProviderNotFound,49 dbapi.get_resource_provider,50 self.context,51 uuidutils.generate_uuid())52 def test_list_resource_providers(self):53 uuids = []54 for i in range(1, 6):55 provider = utils.create_test_resource_provider(56 uuid=uuidutils.generate_uuid(),57 context=self.context,58 name='provider'+str(i))59 uuids.append(six.text_type(provider['uuid']))60 res = dbapi.list_resource_providers(self.context)61 res_uuids = [r.uuid for r in res]62 self.assertEqual(sorted(uuids), sorted(res_uuids))63 def test_list_resource_providers_sorted(self):64 uuids = []65 for i in range(5):66 provider = utils.create_test_resource_provider(67 uuid=uuidutils.generate_uuid(),68 context=self.context,69 name='provider'+str(i))70 uuids.append(six.text_type(provider.uuid))71 res = dbapi.list_resource_providers(self.context, sort_key='uuid')72 res_uuids = [r.uuid for r in res]73 self.assertEqual(sorted(uuids), res_uuids)74 self.assertRaises(exception.InvalidParameterValue,75 dbapi.list_resource_providers,76 self.context,77 sort_key='foo')78 def test_list_resource_providers_with_filters(self):79 provider1 = utils.create_test_resource_provider(80 name='provider-one',81 uuid=uuidutils.generate_uuid(),82 context=self.context)83 provider2 = utils.create_test_resource_provider(84 name='provider-two',85 uuid=uuidutils.generate_uuid(),86 context=self.context)87 res = dbapi.list_resource_providers(88 self.context, filters={'name': 'provider-one'})89 self.assertEqual([provider1.id], [r.id for r in res])90 res = dbapi.list_resource_providers(91 self.context, filters={'name': 'provider-two'})92 self.assertEqual([provider2.id], [r.id for r in res])93 res = dbapi.list_resource_providers(94 self.context, filters={'name': 'bad-provider'})95 self.assertEqual([], [r.id for r in res])96 res = dbapi.list_resource_providers(97 self.context,98 filters={'name': provider1.name})99 self.assertEqual([provider1.id], [r.id for r in res])100 def test_destroy_resource_provider(self):101 provider = utils.create_test_resource_provider(context=self.context)102 dbapi.destroy_resource_provider(self.context, provider.id)103 self.assertRaises(exception.ResourceProviderNotFound,104 dbapi.get_resource_provider,105 self.context, provider.uuid)106 def test_destroy_resource_provider_by_uuid(self):107 provider = utils.create_test_resource_provider(context=self.context)108 dbapi.destroy_resource_provider(self.context, provider.uuid)109 self.assertRaises(exception.ResourceProviderNotFound,110 dbapi.get_resource_provider,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!