Best Python code snippet using tempest_python
basic.py
Source:basic.py
1# Copyright 2013: Mirantis Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15from rally.common import logging16from rally.task import validation17from rally_openstack.common.services.identity import identity18from rally_openstack.task import scenario19class KeystoneBasic(scenario.OpenStackScenario):20 """Base class for Keystone scenarios with initialized service object."""21 def __init__(self, context=None, admin_clients=None, clients=None):22 super(KeystoneBasic, self).__init__(context, admin_clients, clients)23 if hasattr(self, "_admin_clients"):24 self.admin_keystone = identity.Identity(25 self._admin_clients, name_generator=self.generate_random_name,26 atomic_inst=self.atomic_actions())27 if hasattr(self, "_clients"):28 self.keystone = identity.Identity(29 self._clients, name_generator=self.generate_random_name,30 atomic_inst=self.atomic_actions())31@validation.add("required_platform", platform="openstack", admin=True)32@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},33 name="KeystoneBasic.create_user",34 platform="openstack")35class CreateUser(KeystoneBasic):36 @logging.log_deprecated_args(37 "The 'name_length' argument to create_user is ignored",38 "0.1.2", ["name_length"], once=True)39 def run(self, name_length=10, **kwargs):40 """Create a keystone user with random name.41 :param kwargs: Other optional parameters to create users like42 "tenant_id", "enabled".43 """44 self.admin_keystone.create_user(**kwargs)45@validation.add("required_platform", platform="openstack", admin=True)46@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},47 name="KeystoneBasic.create_delete_user",48 platform="openstack")49class CreateDeleteUser(KeystoneBasic):50 @logging.log_deprecated_args(51 "The 'name_length' argument to create_delete_user is ignored",52 "0.1.2", ["name_length"], once=True)53 def run(self, name_length=10, **kwargs):54 """Create a keystone user with random name and then delete it.55 :param kwargs: Other optional parameters to create users like56 "tenant_id", "enabled".57 """58 user = self.admin_keystone.create_user(**kwargs)59 self.admin_keystone.delete_user(user.id)60@validation.add("required_platform", platform="openstack", admin=True)61@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},62 name="KeystoneBasic.create_user_set_enabled_and_delete",63 platform="openstack")64class CreateUserSetEnabledAndDelete(KeystoneBasic):65 def run(self, enabled=True, **kwargs):66 """Create a keystone user, enable or disable it, and delete it.67 :param enabled: Initial state of user 'enabled' flag. The user68 will be created with 'enabled' set to this69 value, and then it will be toggled.70 :param kwargs: Other optional parameters to create user.71 """72 user = self.admin_keystone.create_user(enabled=enabled, **kwargs)73 self.admin_keystone.update_user(user.id, enabled=(not enabled))74 self.admin_keystone.delete_user(user.id)75@validation.add("required_platform", platform="openstack", admin=True)76@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},77 name="KeystoneBasic.create_tenant",78 platform="openstack")79class CreateTenant(KeystoneBasic):80 @logging.log_deprecated_args(81 "The 'name_length' argument to create_tenant is ignored",82 "0.1.2", ["name_length"], once=True)83 def run(self, name_length=10, **kwargs):84 """Create a keystone tenant with random name.85 :param kwargs: Other optional parameters86 """87 self.admin_keystone.create_project(**kwargs)88@validation.add("required_platform", platform="openstack", admin=True)89@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},90 name="KeystoneBasic.authenticate_user_and_validate_token",91 platform="openstack")92class AuthenticateUserAndValidateToken(KeystoneBasic):93 def run(self):94 """Authenticate and validate a keystone token."""95 token = self.admin_keystone.fetch_token()96 self.admin_keystone.validate_token(token)97@validation.add("number", param_name="users_per_tenant", minval=1)98@validation.add("required_platform", platform="openstack", admin=True)99@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},100 name="KeystoneBasic.create_tenant_with_users",101 platform="openstack")102class CreateTenantWithUsers(KeystoneBasic):103 @logging.log_deprecated_args(104 "The 'name_length' argument to create_tenant_with_users is ignored",105 "0.1.2", ["name_length"], once=True)106 def run(self, users_per_tenant, name_length=10, **kwargs):107 """Create a keystone tenant and several users belonging to it.108 :param users_per_tenant: number of users to create for the tenant109 :param kwargs: Other optional parameters for tenant creation110 :returns: keystone tenant instance111 """112 tenant = self.admin_keystone.create_project(**kwargs)113 self.admin_keystone.create_users(tenant.id,114 number_of_users=users_per_tenant)115@validation.add("required_platform", platform="openstack", admin=True)116@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},117 name="KeystoneBasic.create_and_list_users",118 platform="openstack")119class CreateAndListUsers(KeystoneBasic):120 @logging.log_deprecated_args(121 "The 'name_length' argument to create_and_list_users is ignored",122 "0.1.2", ["name_length"], once=True)123 def run(self, name_length=10, **kwargs):124 """Create a keystone user with random name and list all users.125 :param kwargs: Other optional parameters to create users like126 "tenant_id", "enabled".127 """128 kwargs.pop("name", None)129 self.admin_keystone.create_user(**kwargs)130 self.admin_keystone.list_users()131@validation.add("required_platform", platform="openstack", admin=True)132@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},133 name="KeystoneBasic.create_and_list_tenants",134 platform="openstack")135class CreateAndListTenants(KeystoneBasic):136 @logging.log_deprecated_args(137 "The 'name_length' argument to create_and_list_tenants is ignored",138 "0.1.2", ["name_length"], once=True)139 def run(self, name_length=10, **kwargs):140 """Create a keystone tenant with random name and list all tenants.141 :param kwargs: Other optional parameters142 """143 self.admin_keystone.create_project(**kwargs)144 self.admin_keystone.list_projects()145@validation.add("required_platform", platform="openstack",146 admin=True, users=True)147@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},148 name="KeystoneBasic.add_and_remove_user_role",149 platform="openstack")150class AddAndRemoveUserRole(KeystoneBasic):151 def run(self):152 """Create a user role add to a user and disassociate."""153 tenant_id = self.context["tenant"]["id"]154 user_id = self.context["user"]["id"]155 role = self.admin_keystone.create_role()156 self.admin_keystone.add_role(role_id=role.id, user_id=user_id,157 project_id=tenant_id)158 self.admin_keystone.revoke_role(role.id, user_id=user_id,159 project_id=tenant_id)160@validation.add("required_platform", platform="openstack", admin=True)161@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},162 name="KeystoneBasic.create_and_delete_role",163 platform="openstack")164class CreateAndDeleteRole(KeystoneBasic):165 def run(self):166 """Create a user role and delete it."""167 role = self.admin_keystone.create_role()168 self.admin_keystone.delete_role(role.id)169@validation.add("required_platform", platform="openstack",170 admin=True, users=True)171@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},172 name="KeystoneBasic.create_add_and_list_user_roles",173 platform="openstack")174class CreateAddAndListUserRoles(KeystoneBasic):175 def run(self):176 """Create user role, add it and list user roles for given user."""177 tenant_id = self.context["tenant"]["id"]178 user_id = self.context["user"]["id"]179 role = self.admin_keystone.create_role()180 self.admin_keystone.add_role(user_id=user_id, role_id=role.id,181 project_id=tenant_id)182 self.admin_keystone.list_roles(user_id=user_id, project_id=tenant_id)183@validation.add("required_platform", platform="openstack", admin=True)184@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},185 name="KeystoneBasic.get_entities",186 platform="openstack")187class GetEntities(KeystoneBasic):188 def run(self, service_name="keystone"):189 """Get instance of a tenant, user, role and service by id's.190 An ephemeral tenant, user, and role are each created. By191 default, fetches the 'keystone' service. This can be192 overridden (for instance, to get the 'Identity Service'193 service on older OpenStack), or None can be passed explicitly194 to service_name to create a new service and then query it by195 ID.196 :param service_name: The name of the service to get by ID; or197 None, to create an ephemeral service and198 get it by ID.199 """200 project = self.admin_keystone.create_project()201 user = self.admin_keystone.create_user(project_id=project.id)202 role = self.admin_keystone.create_role()203 self.admin_keystone.get_project(project.id)204 self.admin_keystone.get_user(user.id)205 self.admin_keystone.get_role(role.id)206 if service_name is None:207 service = self.admin_keystone.create_service()208 else:209 service = self.admin_keystone.get_service_by_name(service_name)210 self.admin_keystone.get_service(service.id)211@validation.add("required_platform", platform="openstack", admin=True)212@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},213 name="KeystoneBasic.create_and_delete_service",214 platform="openstack")215class CreateAndDeleteService(KeystoneBasic):216 @logging.log_deprecated_args(217 "The 'name' argument to create_and_delete_service will be ignored",218 "0.0.5", ["name"])219 def run(self, name=None, service_type=None, description=None):220 """Create and delete service.221 :param service_type: type of the service222 :param description: description of the service223 """224 service = self.admin_keystone.create_service(service_type=service_type,225 description=description)226 self.admin_keystone.delete_service(service.id)227@validation.add("required_platform", platform="openstack", admin=True)228@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},229 name="KeystoneBasic.create_update_and_delete_tenant",230 platform="openstack")231class CreateUpdateAndDeleteTenant(KeystoneBasic):232 @logging.log_deprecated_args(233 "The 'name_length' argument to create_update_and_delete_tenant is "234 "ignored", "0.1.2", ["name_length"], once=True)235 def run(self, name_length=None, **kwargs):236 """Create, update and delete tenant.237 :param kwargs: Other optional parameters for tenant creation238 """239 project = self.admin_keystone.create_project(**kwargs)240 new_name = self.generate_random_name()241 new_description = self.generate_random_name()242 self.admin_keystone.update_project(project.id, name=new_name,243 description=new_description)244 self.admin_keystone.delete_project(project.id)245@validation.add("required_platform", platform="openstack", admin=True)246@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},247 name="KeystoneBasic.create_user_update_password",248 platform="openstack")249class CreateUserUpdatePassword(KeystoneBasic):250 @logging.log_deprecated_args(251 "The 'name_length' and 'password_length' arguments to "252 "create_user_update_password are ignored",253 "0.1.2", ["name_length", "password_length"], once=True)254 def run(self, name_length=None, password_length=None):255 """Create user and update password for that user."""256 user = self.admin_keystone.create_user()257 password = self.generate_random_name()258 self.admin_keystone.update_user(user.id, password=password)259@validation.add("required_platform", platform="openstack", admin=True)260@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},261 name="KeystoneBasic.create_and_list_services",262 platform="openstack")263class CreateAndListServices(KeystoneBasic):264 @logging.log_deprecated_args(265 "The 'name' argument to create_and_list_services will be ignored",266 "0.0.5", ["name"])267 def run(self, name=None, service_type=None, description=None):268 """Create and list services.269 :param service_type: type of the service270 :param description: description of the service271 """272 self.admin_keystone.create_service(service_type=service_type,273 description=description)274 self.admin_keystone.list_services()275@validation.add("required_platform", platform="openstack", users=True)276@scenario.configure(context={"cleanup@openstack": ["keystone"]},277 name="KeystoneBasic.create_and_list_ec2credentials",278 platform="openstack")279class CreateAndListEc2Credentials(KeystoneBasic):280 def run(self):281 """Create and List all keystone ec2-credentials."""282 self.keystone.create_ec2credentials(283 self.context["user"]["id"],284 project_id=self.context["tenant"]["id"])285 self.keystone.list_ec2credentials(self.context["user"]["id"])286@validation.add("required_platform", platform="openstack", users=True)287@scenario.configure(context={"cleanup@openstack": ["keystone"]},288 name="KeystoneBasic.create_and_delete_ec2credential",289 platform="openstack")290class CreateAndDeleteEc2Credential(KeystoneBasic):291 def run(self):292 """Create and delete keystone ec2-credential."""293 creds = self.keystone.create_ec2credentials(294 self.context["user"]["id"],295 project_id=self.context["tenant"]["id"])296 self.keystone.delete_ec2credential(297 self.context["user"]["id"], access=creds.access)298@validation.add("required_platform", platform="openstack", admin=True)299@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},300 name="KeystoneBasic.create_and_get_role",301 platform="openstack")302class CreateAndGetRole(KeystoneBasic):303 def run(self, **kwargs):304 """Create a user role and get it detailed information.305 :param kwargs: Optional additional arguments for roles creation306 """307 role = self.admin_keystone.create_role(**kwargs)308 self.admin_keystone.get_role(role.id)309@validation.add("required_platform", platform="openstack", admin=True)310@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},311 name="KeystoneBasic.create_and_list_roles",312 platform="openstack")313class CreateAddListRoles(KeystoneBasic):314 def run(self, create_role_kwargs=None, list_role_kwargs=None):315 """Create a role, then list all roles.316 :param create_role_kwargs: Optional additional arguments for317 roles create318 :param list_role_kwargs: Optional additional arguments for roles list319 """320 create_role_kwargs = create_role_kwargs or {}321 list_role_kwargs = list_role_kwargs or {}322 role = self.admin_keystone.create_role(**create_role_kwargs)323 msg = "Role isn't created"324 self.assertTrue(role, err_msg=msg)325 all_roles = self.admin_keystone.list_roles(**list_role_kwargs)326 msg = ("Created role is not in the"327 " list of all available roles")328 self.assertIn(role, all_roles, err_msg=msg)329@validation.add("required_platform", platform="openstack", admin=True)330@scenario.configure(context={"admin_cleanup@openstack": ["keystone"]},331 name="KeystoneBasic.create_and_update_user",332 platform="openstack")333class CreateAndUpdateUser(KeystoneBasic):334 def run(self, create_user_kwargs=None, update_user_kwargs=None):335 """Create user and update the user.336 :param create_user_kwargs: Optional additional arguments for user337 creation338 :param update_user_kwargs: Optional additional arguments for user339 updation340 """341 create_user_kwargs = create_user_kwargs or {}342 user = self.admin_keystone.create_user(**create_user_kwargs)343 self.admin_keystone.update_user(user.id, **update_user_kwargs)344 user_data = self.admin_clients("keystone").users.get(user.id)345 for args in update_user_kwargs:346 msg = ("%s isn't updated" % args)347 self.assertEqual(getattr(user_data, str(args)),...
test_utils.py
Source:test_utils.py
1# Copyright 2013: Mirantis Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import mock16from rally.plugins.openstack.scenarios.keystone import utils17from tests.unit import fakes18from tests.unit import test19UTILS = "rally.plugins.openstack.scenarios.keystone.utils."20class KeystoneUtilsTestCase(test.TestCase):21 @mock.patch("rally.common.utils.name_matches_object")22 def test_is_temporary(self, mock_name_matches_object):23 resource = mock.Mock()24 self.assertEqual(utils.is_temporary(resource),25 mock_name_matches_object.return_value)26 mock_name_matches_object.assert_called_once_with(27 resource.name, utils.KeystoneScenario)28class KeystoneScenarioTestCase(test.ScenarioTestCase):29 @mock.patch("uuid.uuid4", return_value="pwd")30 def test_user_create(self, mock_uuid4):31 scenario = utils.KeystoneScenario(self.context)32 scenario.generate_random_name = mock.Mock(return_value="foobarov")33 result = scenario._user_create()34 self.assertEqual(35 self.admin_clients("keystone").users.create.return_value, result)36 self.admin_clients("keystone").users.create.assert_called_once_with(37 "foobarov",38 password=mock_uuid4.return_value,39 email="foobarov@rally.me")40 mock_uuid4.assert_called_with()41 self._test_atomic_action_timer(scenario.atomic_actions(),42 "keystone.create_user")43 def test_update_user_enabled(self):44 user = mock.Mock()45 enabled = mock.Mock()46 scenario = utils.KeystoneScenario(self.context)47 scenario._update_user_enabled(user, enabled)48 self.admin_clients(49 "keystone").users.update_enabled.assert_called_once_with(user,50 enabled)51 self._test_atomic_action_timer(scenario.atomic_actions(),52 "keystone.update_user_enabled")53 def test_role_create(self):54 scenario = utils.KeystoneScenario(self.context)55 scenario.generate_random_name = mock.Mock()56 result = scenario._role_create()57 self.assertEqual(58 self.admin_clients("keystone").roles.create.return_value, result)59 self.admin_clients("keystone").roles.create.assert_called_once_with(60 scenario.generate_random_name.return_value)61 self._test_atomic_action_timer(scenario.atomic_actions(),62 "keystone.create_role")63 def test_list_roles_for_user(self):64 user = mock.MagicMock()65 tenant = mock.MagicMock()66 scenario = utils.KeystoneScenario(self.context)67 scenario._list_roles_for_user(user, tenant)68 self.admin_clients(69 "keystone").roles.roles_for_user.assert_called_once_with(user,70 tenant)71 self._test_atomic_action_timer(scenario.atomic_actions(),72 "keystone.list_roles")73 def test_role_add(self):74 user = mock.MagicMock()75 role = mock.MagicMock()76 tenant = mock.MagicMock()77 scenario = utils.KeystoneScenario(self.context)78 scenario._role_add(user=user.id, role=role.id, tenant=tenant.id)79 self.admin_clients(80 "keystone").roles.add_user_role.assert_called_once_with(user.id,81 role.id,82 tenant.id)83 self._test_atomic_action_timer(scenario.atomic_actions(),84 "keystone.add_role")85 def test_user_delete(self):86 resource = fakes.FakeResource()87 resource.delete = mock.MagicMock()88 scenario = utils.KeystoneScenario(self.context)89 scenario._resource_delete(resource)90 resource.delete.assert_called_once_with()91 r = "keystone.delete_%s" % resource.__class__.__name__.lower()92 self._test_atomic_action_timer(scenario.atomic_actions(), r)93 def test_role_remove(self):94 user = mock.MagicMock()95 role = mock.MagicMock()96 tenant = mock.MagicMock()97 scenario = utils.KeystoneScenario(self.context)98 scenario._role_remove(user=user, role=role, tenant=tenant)99 self.admin_clients(100 "keystone").roles.remove_user_role.assert_called_once_with(user,101 role,102 tenant)103 self._test_atomic_action_timer(scenario.atomic_actions(),104 "keystone.remove_role")105 def test_tenant_create(self):106 scenario = utils.KeystoneScenario(self.context)107 scenario.generate_random_name = mock.Mock()108 result = scenario._tenant_create()109 self.assertEqual(110 self.admin_clients("keystone").tenants.create.return_value, result)111 self.admin_clients("keystone").tenants.create.assert_called_once_with(112 scenario.generate_random_name.return_value)113 self._test_atomic_action_timer(scenario.atomic_actions(),114 "keystone.create_tenant")115 def test_service_create(self):116 service_type = "service_type"117 description = "description"118 scenario = utils.KeystoneScenario(self.context)119 scenario.generate_random_name = mock.Mock()120 result = scenario._service_create(service_type=service_type,121 description=description)122 self.assertEqual(123 self.admin_clients("keystone").services.create.return_value,124 result)125 self.admin_clients("keystone").services.create.assert_called_once_with(126 scenario.generate_random_name.return_value,127 service_type, description)128 self._test_atomic_action_timer(scenario.atomic_actions(),129 "keystone.create_service")130 def test_tenant_create_with_users(self):131 tenant = mock.MagicMock()132 scenario = utils.KeystoneScenario(self.context)133 scenario.generate_random_name = mock.Mock(return_value="foobarov")134 scenario._users_create(tenant, users_per_tenant=1)135 self.admin_clients("keystone").users.create.assert_called_once_with(136 "foobarov", password="foobarov", email="foobarov@rally.me",137 tenant_id=tenant.id)138 self._test_atomic_action_timer(scenario.atomic_actions(),139 "keystone.create_users")140 def test_list_users(self):141 scenario = utils.KeystoneScenario(self.context)142 scenario._list_users()143 self.admin_clients("keystone").users.list.assert_called_once_with()144 self._test_atomic_action_timer(scenario.atomic_actions(),145 "keystone.list_users")146 def test_list_tenants(self):147 scenario = utils.KeystoneScenario(self.context)148 scenario._list_tenants()149 self.admin_clients("keystone").tenants.list.assert_called_once_with()150 self._test_atomic_action_timer(scenario.atomic_actions(),151 "keystone.list_tenants")152 def test_list_services(self):153 scenario = utils.KeystoneScenario(self.context)154 scenario._list_services()155 self.admin_clients("keystone").services.list.assert_called_once_with()156 self._test_atomic_action_timer(scenario.atomic_actions(),157 "keystone.service_list")158 def test_delete_service(self):159 service = mock.MagicMock()160 scenario = utils.KeystoneScenario(self.context)161 scenario._delete_service(service_id=service.id)162 self.admin_clients("keystone").services.delete.assert_called_once_with(163 service.id)164 self._test_atomic_action_timer(scenario.atomic_actions(),165 "keystone.delete_service")166 def test_get_tenant(self):167 tenant = mock.MagicMock()168 scenario = utils.KeystoneScenario(self.context)169 scenario._get_tenant(tenant_id=tenant.id)170 self.admin_clients("keystone").tenants.get.assert_called_once_with(171 tenant.id)172 self._test_atomic_action_timer(scenario.atomic_actions(),173 "keystone.get_tenant")174 def test_get_user(self):175 user = mock.MagicMock()176 scenario = utils.KeystoneScenario(self.context)177 scenario._get_user(user_id=user.id)178 self.admin_clients("keystone").users.get.assert_called_once_with(179 user.id)180 self._test_atomic_action_timer(scenario.atomic_actions(),181 "keystone.get_user")182 def test_get_role(self):183 role = mock.MagicMock()184 scenario = utils.KeystoneScenario(self.context)185 scenario._get_role(role_id=role.id)186 self.admin_clients("keystone").roles.get.assert_called_once_with(187 role.id)188 self._test_atomic_action_timer(scenario.atomic_actions(),189 "keystone.get_role")190 def test_get_service(self):191 service = mock.MagicMock()192 scenario = utils.KeystoneScenario(self.context)193 scenario._get_service(service_id=service.id)194 self.admin_clients("keystone").services.get.assert_called_once_with(195 service.id)196 self._test_atomic_action_timer(scenario.atomic_actions(),197 "keystone.get_service")198 def test_update_tenant(self):199 tenant = mock.MagicMock()200 description = "new description"201 scenario = utils.KeystoneScenario(self.context)202 scenario.generate_random_name = mock.Mock()203 scenario._update_tenant(tenant=tenant, description=description)204 self.admin_clients("keystone").tenants.update.assert_called_once_with(205 tenant.id, scenario.generate_random_name.return_value,206 description)207 self._test_atomic_action_timer(scenario.atomic_actions(),208 "keystone.update_tenant")209 def test_update_user_password(self):210 password = "pswd"211 user = mock.MagicMock()212 scenario = utils.KeystoneScenario(self.context)213 scenario._update_user_password(password=password, user_id=user.id)214 self.admin_clients(215 "keystone").users.update_password.assert_called_once_with(user.id,216 password)217 self._test_atomic_action_timer(scenario.atomic_actions(),218 "keystone.update_user_password")219 @mock.patch("rally.plugins.openstack.scenario.OpenStackScenario."220 "admin_clients")221 def test_update_user_password_v3(self,222 mock_open_stack_scenario_admin_clients):223 password = "pswd"224 user = mock.MagicMock()225 scenario = utils.KeystoneScenario()226 type(mock_open_stack_scenario_admin_clients.return_value).version = (227 mock.PropertyMock(return_value="v3"))228 scenario._update_user_password(password=password, user_id=user.id)229 mock_open_stack_scenario_admin_clients(230 "keystone").users.update.assert_called_once_with(231 user.id, password=password)232 self._test_atomic_action_timer(scenario.atomic_actions(),233 "keystone.update_user_password")234 def test_get_service_by_name(self):235 scenario = utils.KeystoneScenario(self.context)236 svc_foo, svc_bar = mock.Mock(), mock.Mock()237 scenario._list_services = mock.Mock(return_value=[svc_foo, svc_bar])238 self.assertEqual(scenario._get_service_by_name(svc_bar.name), svc_bar)239 self.assertIsNone(scenario._get_service_by_name("spam"))240 @mock.patch(UTILS + "KeystoneScenario.clients")241 def test_create_ec2credentials(self, mock_clients):242 scenario = utils.KeystoneScenario(self.context)243 creds = mock.Mock()244 mock_clients("keystone").ec2.create.return_value = creds245 create_creds = scenario._create_ec2credentials("user_id",246 "tenant_id")247 self.assertEqual(create_creds, creds)248 mock_clients("keystone").ec2.create.assert_called_once_with(249 "user_id", "tenant_id")250 self._test_atomic_action_timer(scenario.atomic_actions(),251 "keystone.create_ec2creds")252 @mock.patch(UTILS + "KeystoneScenario.clients")253 def test_list_ec2credentials(self, mock_clients):254 scenario = utils.KeystoneScenario(self.context)255 creds_list = mock.Mock()256 mock_clients("keystone").ec2.list.return_value = creds_list257 list_creds = scenario._list_ec2credentials("user_id")258 self.assertEqual(list_creds, creds_list)259 mock_clients("keystone").ec2.list.assert_called_once_with("user_id")260 self._test_atomic_action_timer(scenario.atomic_actions(),261 "keystone.list_ec2creds")262 @mock.patch(UTILS + "KeystoneScenario.clients")263 def test_delete_ec2credentials(self, mock_clients):264 scenario = utils.KeystoneScenario(self.context)265 mock_clients("keystone").ec2.delete = mock.MagicMock()266 scenario._delete_ec2credential("user_id", "access")267 mock_clients("keystone").ec2.delete.assert_called_once_with("user_id",268 "access")269 self._test_atomic_action_timer(scenario.atomic_actions(),...
utils.py
Source:utils.py
1# Copyright 2013: Mirantis Inc.2# All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import uuid16from rally.common import utils17from rally.plugins.openstack import scenario18from rally.task import atomic19def is_temporary(resource):20 return utils.name_matches_object(resource.name, KeystoneScenario)21class KeystoneScenario(scenario.OpenStackScenario):22 """Base class for Keystone scenarios with basic atomic actions."""23 @atomic.action_timer("keystone.create_user")24 def _user_create(self, email=None, **kwargs):25 """Creates keystone user with random name.26 :param kwargs: Other optional parameters to create users like27 "tenant_id", "enabled".28 :returns: keystone user instance29 """30 name = self.generate_random_name()31 # NOTE(boris-42): password and email parameters are required by32 # keystone client v2.0. This should be cleanuped33 # when we switch to v3.34 password = kwargs.pop("password", str(uuid.uuid4()))35 email = email or (name + "@rally.me")36 return self.admin_clients("keystone").users.create(37 name, password=password, email=email, **kwargs)38 @atomic.action_timer("keystone.update_user_enabled")39 def _update_user_enabled(self, user, enabled):40 """Enable or disable a user.41 :param user: The user to enable or disable42 :param enabled: Boolean indicating if the user should be43 enabled (True) or disabled (False)44 """45 self.admin_clients("keystone").users.update_enabled(user, enabled)46 def _resource_delete(self, resource):47 """"Delete keystone resource."""48 r = "keystone.delete_%s" % resource.__class__.__name__.lower()49 with atomic.ActionTimer(self, r):50 resource.delete()51 @atomic.action_timer("keystone.create_tenant")52 def _tenant_create(self, **kwargs):53 """Creates keystone tenant with random name.54 :param kwargs: Other optional parameters55 :returns: keystone tenant instance56 """57 name = self.generate_random_name()58 return self.admin_clients("keystone").tenants.create(name, **kwargs)59 @atomic.action_timer("keystone.create_service")60 def _service_create(self, service_type="rally_test_type",61 description=None):62 """Creates keystone service with random name.63 :param service_type: type of the service64 :param description: description of the service65 :returns: keystone service instance66 """67 description = description or self.generate_random_name()68 return self.admin_clients("keystone").services.create(69 self.generate_random_name(),70 service_type, description)71 @atomic.action_timer("keystone.create_users")72 def _users_create(self, tenant, users_per_tenant):73 """Adds users to a tenant.74 :param tenant: tenant object75 :param users_per_tenant: number of users in per tenant76 """77 for i in range(users_per_tenant):78 name = self.generate_random_name()79 password = name80 email = name + "@rally.me"81 self.admin_clients("keystone").users.create(82 name, password=password, email=email, tenant_id=tenant.id)83 @atomic.action_timer("keystone.create_role")84 def _role_create(self):85 """Creates keystone user role with random name.86 :returns: keystone user role instance87 """88 role = self.admin_clients("keystone").roles.create(89 self.generate_random_name())90 return role91 @atomic.action_timer("keystone.list_users")92 def _list_users(self):93 """List users."""94 return self.admin_clients("keystone").users.list()95 @atomic.action_timer("keystone.list_tenants")96 def _list_tenants(self):97 """List tenants."""98 return self.admin_clients("keystone").tenants.list()99 @atomic.action_timer("keystone.service_list")100 def _list_services(self):101 """List services."""102 return self.admin_clients("keystone").services.list()103 @atomic.action_timer("keystone.list_roles")104 def _list_roles_for_user(self, user, tenant):105 """List user roles.106 :param user: user for whom roles will be listed107 :param tenant: tenant on which user have roles108 """109 return self.admin_clients("keystone").roles.roles_for_user(110 user, tenant)111 @atomic.action_timer("keystone.add_role")112 def _role_add(self, user, role, tenant):113 """Add role to a given user on a tenant.114 :param user: user to be assigned the role to115 :param role: user role to assign with116 :param tenant: tenant on which assignation will take place117 """118 self.admin_clients("keystone").roles.add_user_role(user, role, tenant)119 @atomic.action_timer("keystone.remove_role")120 def _role_remove(self, user, role, tenant):121 """Dissociate user with role.122 :param user: user to be stripped with role123 :param role: role to be dissociated with user124 :param tenant: tenant on which assignation took place125 """126 self.admin_clients("keystone").roles.remove_user_role(user,127 role, tenant)128 @atomic.action_timer("keystone.get_tenant")129 def _get_tenant(self, tenant_id):130 """Get given tenant.131 :param tenant_id: tenant object132 """133 return self.admin_clients("keystone").tenants.get(tenant_id)134 @atomic.action_timer("keystone.get_user")135 def _get_user(self, user_id):136 """Get given user.137 :param user_id: user object138 """139 return self.admin_clients("keystone").users.get(user_id)140 @atomic.action_timer("keystone.get_role")141 def _get_role(self, role_id):142 """Get given user role.143 :param role_id: user role object144 """145 return self.admin_clients("keystone").roles.get(role_id)146 @atomic.action_timer("keystone.get_service")147 def _get_service(self, service_id):148 """Get service with given service id.149 :param service_id: id for service object150 """151 return self.admin_clients("keystone").services.get(service_id)152 def _get_service_by_name(self, name):153 for i in self._list_services():154 if i.name == name:155 return i156 @atomic.action_timer("keystone.delete_service")157 def _delete_service(self, service_id):158 """Delete service.159 :param service_id: service to be deleted160 """161 self.admin_clients("keystone").services.delete(service_id)162 @atomic.action_timer("keystone.update_tenant")163 def _update_tenant(self, tenant, description=None):164 """Update tenant name and description.165 :param tenant: tenant to be updated166 :param description: tenant description to be set167 """168 name = self.generate_random_name()169 description = description or self.generate_random_name()170 self.admin_clients("keystone").tenants.update(tenant.id,171 name, description)172 @atomic.action_timer("keystone.update_user_password")173 def _update_user_password(self, user_id, password):174 """Update user password.175 :param user_id: id of the user176 :param password: new password177 """178 admin_clients = self.admin_clients("keystone")179 if admin_clients.version in ["v3"]:180 admin_clients.users.update(user_id, password=password)181 else:182 admin_clients.users.update_password(user_id, password)183 @atomic.action_timer("keystone.create_ec2creds")184 def _create_ec2credentials(self, user_id, tenant_id):185 """Create ec2credentials.186 :param user_id: User ID for which to create credentials187 :param tenant_id: Tenant ID for which to create credentials188 :returns: Created ec2-credentials object189 """190 return self.clients("keystone").ec2.create(user_id, tenant_id)191 @atomic.action_timer("keystone.list_ec2creds")192 def _list_ec2credentials(self, user_id):193 """List of access/secret pairs for a user_id.194 :param user_id: List all ec2-credentials for User ID195 :returns: Return ec2-credentials list196 """197 return self.clients("keystone").ec2.list(user_id)198 @atomic.action_timer("keystone.delete_ec2creds")199 def _delete_ec2credential(self, user_id, access):200 """Delete ec2credential.201 :param user_id: User ID for which to delete credential202 :param access: access key for ec2credential to delete203 """...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!