Best Python code snippet using tempest_python
test_cinder_workflow.py
Source:test_cinder_workflow.py
1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12from unittest import mock13from cinderclient import exceptions as cinder_exception14from os_brick import exception as os_brick_exception15from oslo_serialization import jsonutils16import zun.conf17from zun.tests import base18from zun.volume import cinder_workflow19CONF = zun.conf.CONF20class CinderWorkflowTestCase(base.TestCase):21 def setUp(self):22 super(CinderWorkflowTestCase, self).setUp()23 self.fake_volume_id = 'fake-volume-id-1'24 self.fake_conn_prprts = {25 'ip': '10.3.4.5',26 'host': 'fakehost1'27 }28 self.fake_device_info = {29 'path': '/foo'30 }31 self.fake_conn_info = {32 'driver_volume_type': 'fake',33 'data': {},34 }35 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')36 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')37 @mock.patch('zun.volume.cinder_api.CinderAPI')38 def test_attach_volume(self,39 mock_cinder_api_cls,40 mock_get_connector_prprts,41 mock_get_volume_connector):42 mock_cinder_api, mock_connector = self._test_attach_volume(43 mock_cinder_api_cls, mock_get_connector_prprts,44 mock_get_volume_connector)45 mock_cinder_api.reserve_volume.assert_called_once_with(46 self.fake_volume_id)47 mock_cinder_api.initialize_connection.assert_called_once_with(48 self.fake_volume_id, self.fake_conn_prprts)49 mock_connector.connect_volume.assert_called_once_with(50 self.fake_conn_info['data'])51 mock_cinder_api.attach.assert_called_once_with(52 volume_id=self.fake_volume_id,53 mountpoint=self.fake_device_info['path'],54 hostname=CONF.host,55 container_uuid='123')56 mock_connector.disconnect_volume.assert_not_called()57 mock_cinder_api.terminate_connection.assert_not_called()58 mock_cinder_api.detach.assert_not_called()59 mock_cinder_api.unreserve_volume.assert_not_called()60 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')61 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')62 @mock.patch('zun.volume.cinder_api.CinderAPI')63 def test_attach_volume_fail_reserve_volume(64 self, mock_cinder_api_cls, mock_get_connector_prprts,65 mock_get_volume_connector):66 mock_cinder_api, mock_connector = self._test_attach_volume(67 mock_cinder_api_cls, mock_get_connector_prprts,68 mock_get_volume_connector, fail_reserve=True)69 mock_cinder_api.reserve_volume.assert_called_once_with(70 self.fake_volume_id)71 mock_cinder_api.initialize_connection.assert_not_called()72 mock_connector.connect_volume.assert_not_called()73 mock_cinder_api.attach.assert_not_called()74 mock_connector.disconnect_volume.assert_not_called()75 mock_cinder_api.terminate_connection.assert_not_called()76 mock_cinder_api.detach.assert_not_called()77 mock_cinder_api.unreserve_volume.assert_called_once_with(78 self.fake_volume_id)79 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')80 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')81 @mock.patch('zun.volume.cinder_api.CinderAPI')82 def test_attach_volume_fail_initialize_connection(83 self, mock_cinder_api_cls, mock_get_connector_prprts,84 mock_get_volume_connector):85 mock_cinder_api, mock_connector = self._test_attach_volume(86 mock_cinder_api_cls, mock_get_connector_prprts,87 mock_get_volume_connector, fail_init=True)88 mock_cinder_api.reserve_volume.assert_called_once_with(89 self.fake_volume_id)90 mock_cinder_api.initialize_connection.assert_called_once_with(91 self.fake_volume_id, self.fake_conn_prprts)92 mock_connector.connect_volume.assert_not_called()93 mock_cinder_api.attach.assert_not_called()94 mock_connector.disconnect_volume.assert_not_called()95 mock_cinder_api.terminate_connection.assert_not_called()96 mock_cinder_api.detach.assert_not_called()97 mock_cinder_api.unreserve_volume.assert_called_once_with(98 self.fake_volume_id)99 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')100 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')101 @mock.patch('zun.volume.cinder_api.CinderAPI')102 def test_attach_volume_fail_connect_volume(103 self, mock_cinder_api_cls, mock_get_connector_prprts,104 mock_get_volume_connector):105 mock_cinder_api, mock_connector = self._test_attach_volume(106 mock_cinder_api_cls, mock_get_connector_prprts,107 mock_get_volume_connector, fail_connect=True)108 mock_cinder_api.reserve_volume.assert_called_once_with(109 self.fake_volume_id)110 mock_cinder_api.initialize_connection.assert_called_once_with(111 self.fake_volume_id, self.fake_conn_prprts)112 mock_connector.connect_volume.assert_called_once_with(113 self.fake_conn_info['data'])114 mock_cinder_api.attach.assert_not_called()115 mock_connector.disconnect_volume.assert_not_called()116 mock_cinder_api.terminate_connection.assert_called_once_with(117 self.fake_volume_id, self.fake_conn_prprts)118 mock_cinder_api.detach.assert_not_called()119 mock_cinder_api.unreserve_volume.assert_called_once_with(120 self.fake_volume_id)121 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')122 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')123 @mock.patch('zun.volume.cinder_api.CinderAPI')124 def test_attach_volume_fail_attach(self,125 mock_cinder_api_cls,126 mock_get_connector_prprts,127 mock_get_volume_connector):128 mock_cinder_api, mock_connector = self._test_attach_volume(129 mock_cinder_api_cls, mock_get_connector_prprts,130 mock_get_volume_connector, fail_attach=True)131 mock_cinder_api.reserve_volume.assert_called_once_with(132 self.fake_volume_id)133 mock_cinder_api.initialize_connection.assert_called_once_with(134 self.fake_volume_id, self.fake_conn_prprts)135 mock_connector.connect_volume.assert_called_once_with(136 self.fake_conn_info['data'])137 mock_cinder_api.attach.assert_called_once_with(138 volume_id=self.fake_volume_id,139 mountpoint=self.fake_device_info['path'],140 hostname=CONF.host,141 container_uuid='123')142 mock_connector.disconnect_volume.assert_called_once_with(143 self.fake_conn_info['data'], None)144 mock_cinder_api.terminate_connection.assert_called_once_with(145 self.fake_volume_id, self.fake_conn_prprts)146 mock_cinder_api.detach.assert_called_once_with(147 mock.ANY)148 mock_cinder_api.unreserve_volume.assert_called_once_with(149 self.fake_volume_id)150 def _test_attach_volume(self,151 mock_cinder_api_cls,152 mock_get_connector_prprts,153 mock_get_volume_connector,154 fail_reserve=False, fail_init=False,155 fail_connect=False, fail_attach=False):156 volume = mock.MagicMock()157 volume.cinder_volume_id = self.fake_volume_id158 volume.container_uuid = '123'159 mock_cinder_api = mock.MagicMock()160 mock_cinder_api_cls.return_value = mock_cinder_api161 mock_connector = mock.MagicMock()162 mock_get_connector_prprts.return_value = self.fake_conn_prprts163 mock_get_volume_connector.return_value = mock_connector164 mock_cinder_api.initialize_connection.return_value = \165 self.fake_conn_info166 mock_connector.connect_volume.return_value = self.fake_device_info167 cinder = cinder_workflow.CinderWorkflow(self.context)168 if fail_reserve:169 mock_cinder_api.reserve_volume.side_effect = \170 cinder_exception.ClientException(400)171 self.assertRaises(cinder_exception.ClientException,172 cinder.attach_volume, volume)173 elif fail_init:174 mock_cinder_api.initialize_connection.side_effect = \175 cinder_exception.ClientException(400)176 self.assertRaises(cinder_exception.ClientException,177 cinder.attach_volume, volume)178 elif fail_connect:179 mock_connector.connect_volume.side_effect = \180 os_brick_exception.BrickException()181 self.assertRaises(os_brick_exception.BrickException,182 cinder.attach_volume, volume)183 elif fail_attach:184 mock_cinder_api.attach.side_effect = \185 cinder_exception.ClientException(400)186 self.assertRaises(cinder_exception.ClientException,187 cinder.attach_volume, volume)188 else:189 device_path = cinder.attach_volume(volume)190 self.assertEqual('/foo', device_path)191 return mock_cinder_api, mock_connector192 @mock.patch('zun.volume.cinder_workflow.CinderWorkflow.'193 '_volume_connection_keep')194 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')195 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')196 @mock.patch('zun.volume.cinder_api.CinderAPI')197 def test_detach_volume(self,198 mock_cinder_api_cls,199 mock_get_connector_prprts,200 mock_get_volume_connector,201 mock_connection_keep):202 volume = mock.MagicMock()203 volume.cinder_volume_id = self.fake_volume_id204 volume.connection_info = jsonutils.dumps(self.fake_conn_info)205 mock_cinder_api = mock.MagicMock()206 mock_cinder_api_cls.return_value = mock_cinder_api207 mock_connector = mock.MagicMock()208 mock_get_connector_prprts.return_value = self.fake_conn_prprts209 mock_get_volume_connector.return_value = mock_connector210 mock_connection_keep.return_value = False211 cinder = cinder_workflow.CinderWorkflow(self.context)212 cinder.detach_volume(self.context, volume)213 mock_cinder_api.begin_detaching.assert_called_once_with(214 self.fake_volume_id)215 mock_connector.disconnect_volume.assert_called_once_with(216 self.fake_conn_info['data'], None)217 mock_cinder_api.terminate_connection.assert_called_once_with(218 self.fake_volume_id, self.fake_conn_prprts)219 mock_cinder_api.detach.assert_called_once_with(volume)220 mock_cinder_api.roll_detaching.assert_not_called()221 @mock.patch('zun.volume.cinder_workflow.CinderWorkflow.'222 '_volume_connection_keep')223 @mock.patch('zun.volume.cinder_workflow.get_volume_connector')224 @mock.patch('zun.volume.cinder_workflow.get_volume_connector_properties')225 @mock.patch('zun.volume.cinder_api.CinderAPI')226 def test_detach_volume_fail_disconnect(227 self, mock_cinder_api_cls, mock_get_connector_prprts,228 mock_get_volume_connector, mock_connection_keep):229 volume = mock.MagicMock()230 volume.cinder_volume_id = self.fake_volume_id231 volume.connection_info = jsonutils.dumps(self.fake_conn_info)232 mock_cinder_api = mock.MagicMock()233 mock_cinder_api_cls.return_value = mock_cinder_api234 mock_connector = mock.MagicMock()235 mock_get_connector_prprts.return_value = self.fake_conn_prprts236 mock_get_volume_connector.return_value = mock_connector237 mock_connection_keep.return_value = False238 mock_connector.disconnect_volume.side_effect = \239 os_brick_exception.BrickException()240 cinder = cinder_workflow.CinderWorkflow(self.context)241 self.assertRaises(os_brick_exception.BrickException,242 cinder.detach_volume, self.context, volume)243 mock_cinder_api.begin_detaching.assert_called_once_with(244 self.fake_volume_id)245 mock_connector.disconnect_volume.assert_called_once_with(246 self.fake_conn_info['data'], None)247 mock_cinder_api.terminate_connection.assert_not_called()248 mock_cinder_api.detach.assert_not_called()249 mock_cinder_api.roll_detaching.assert_called_once_with(250 self.fake_volume_id)251 @mock.patch('zun.volume.cinder_api.CinderAPI')252 def test_delete_volume(self,253 mock_cinder_api_cls):254 volume = mock.MagicMock()255 volume.cinder_volume_id = self.fake_volume_id256 volume.connection_info = jsonutils.dumps(self.fake_conn_info)257 mock_cinder_api = mock.MagicMock()258 mock_cinder_api_cls.return_value = mock_cinder_api259 cinder = cinder_workflow.CinderWorkflow(self.context)260 cinder.delete_volume(volume)261 mock_cinder_api.delete_volume.assert_called_once_with(...
test_volume_actions_rbac.py
Source:test_volume_actions_rbac.py
...124 @rbac_rule_validation.action(service="cinder",125 rule="volume:unreserve_volume")126 def test_volume_unreserve(self):127 self.rbac_utils.switch_role(self, toggle_rbac_role=True)128 self.volumes_client.unreserve_volume(self.volume['id'])129 @decorators.idempotent_id('c015c82f-7010-48cc-bd71-4ef542046f20')130 @rbac_rule_validation.action(service="cinder",131 rule="volume:retype")132 def test_volume_retype(self):133 volume = self.create_volume()134 vol_type = self.create_volume_type()['name']135 self.rbac_utils.switch_role(self, toggle_rbac_role=True)136 self.volumes_client.retype_volume(volume['id'], new_type=vol_type)137 @rbac_rule_validation.action(138 service="cinder",139 rule="volume_extension:volume_admin_actions:reset_status")140 @decorators.idempotent_id('4b3dad7d-0e73-4839-8781-796dd3d7af1d')141 def test_volume_reset_status(self):142 volume = self.create_volume()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!