Best Python code snippet using localstack_python
cluster_manager.py
Source:cluster_manager.py
1"""This is a script to create k8s clusters using Platform9 Qbert and Packet Baremetal."""2import openstack3import qbert4import uuid5import re6import os7import json8from async_tasks import create_terraform_stack, delete_terraform_stack, authorize_cluster9def delete_cluster(endpoint, user, pw, tenant, region, cluster_id):10 token, catalog, project_id = qbert.get_token_v3(endpoint, user, pw, tenant)11 qbert_url = "{0}/{1}".format(qbert.get_service_url('qbert', catalog, region), project_id)12 delete_cluster = qbert.delete_request(qbert_url, token, "clusters/{}".format(cluster_id))13 return delete_cluster14def delete_user(conn, user):15 os_user = conn.get_user(user)16 if os_user:17 conn.delete_user(os_user, domain_id="default")18 else:19 print("User doesn't exist. Skipping deletion...")20 return os_user21def delete_project(conn, endpoint, user, pw, tenant, region):22 os_admin = conn.get_user(user)23 os_project = conn.get_project(tenant)24 if os_project:25 params = dict(project=os_project)26 role_mappings = conn.list_role_assignments(filters=params)27 for mapping in role_mappings:28 if mapping['user'] != os_admin['id']:29 print("Project still has users. Skipping project deletion...")30 return False31 token, catalog, project_id = qbert.get_token_v3(endpoint, user, pw, tenant)32 qbert_url = "{0}/{1}".format(qbert.get_service_url('qbert', catalog, region), project_id)33 clusters = qbert.get_request(qbert_url, token, "clusters")34 if len(clusters) != 0:35 print("Cluster(s) still exist. Skipping project deletion...")36 return False37 conn.delete_project(os_project, domain_id="default")38 return True39 else:40 print("Project doesn't exists. Skipping deletion...")41 return False42def create_cluster(endpoint, user, pw, tenant, region, cluster_name, dnz_zone_name, privileged_mode_enabled=True,43 app_catalog_enabled=False, runtime_config='', allow_workloads_on_master=False,44 networkPlugin='calico', container_cidr='172.30.0.0/16', services_cidr='172.31.0.0/16',45 debug_flag=True):46 token, catalog, project_id = qbert.get_token_v3(endpoint, user, pw, tenant)47 qbert_url = "{0}/{1}".format(qbert.get_service_url('qbert', catalog, region), project_id)48 node_pool_uuid = qbert.get_node_pool(qbert_url, token)49 new_cluster = qbert.create_cluster(qbert_url, token, cluster_name, container_cidr, services_cidr,50 "", privileged_mode_enabled, app_catalog_enabled,51 allow_workloads_on_master, runtime_config, node_pool_uuid,52 networkPlugin, debug_flag)53 put_body = {"externalDnsName": "{}-api.{}".format(new_cluster, dnz_zone_name)}54 qbert.put_request(qbert_url, token, "clusters/{}".format(new_cluster), put_body)55 return new_cluster, node_pool_uuid56def create_project(conn, project_name, os_admin_username):57 os_project = conn.get_project(project_name)58 if os_project:59 print("Project already exists. Skipping creation...")60 else:61 os_project = conn.create_project(project_name, domain_id="default")62 os_admin = conn.get_user(os_admin_username)63 conn.grant_role('admin', user=os_admin, project=os_project)64 return os_project65def create_user(conn, os_project, user_email):66 os_user = conn.get_user(user_email)67 if os_user:68 print("User already exists. Resetting password...")69 user_password = str(uuid.uuid4())70 os_user = conn.update_user(os_user, password=user_password, domain_id="default")71 else:72 user_password = str(uuid.uuid4())73 os_user = conn.create_user(name=user_email, password=user_password, email=user_email, domain_id="default")74 conn.grant_role('_member_', user=os_user, project=os_project)75 return os_user, user_password76def create_tf_vars_file(state_path, tf_vars):77 tf_vars_file = "{}/vars.tf".format(state_path)78 f = open(tf_vars_file, "w")79 for key, value in tf_vars.items():80 f.write('{} = "{}"\n'.format(key, value))81 return tf_vars_file82def do_delete_stack(secrets):83 account_endpoint = re.search("(?:http.*://)?(?P<host>[^:/ ]+)", secrets['OS_AUTH_URL']).group('host')84 conn = openstack.connect(cloud='cloud')85 cluster_id = secrets['CLUSTER_ID']86 """ TODO: Get list of nodes that are attached to this cluster from qbert. Then execute a DELETE in ResMgr for these hosts87 curl 'https://{DU_FQDN}/resmgr/v1/hosts/{HOST_ID}' -X DELETE -H 'Accept: application/json' -H 'X-Auth-Token: {TOKEN}'88 """89 delete_cluster(account_endpoint, secrets['OS_USERNAME'], secrets['OS_PASSWORD'], secrets['PACKET_PROJECT_ID'],90 secrets['OS_REGION_NAME'], cluster_id)91 # TODO: We need to grab all users that are in the project that have <cluster_id> in their username and92 # delete them all93 project_deleted = delete_project(conn, account_endpoint, secrets['OS_USERNAME'], secrets['OS_PASSWORD'],94 secrets['PACKET_PROJECT_ID'], secrets['OS_REGION_NAME'])95 dir_path = "{}/{}".format(os.path.dirname(os.path.realpath(__file__)), "terraform")96 state_path = "{}/states/{}/{}".format(dir_path, secrets['PACKET_PROJECT_ID'], cluster_id)97 celery_task = delete_terraform_stack.delay(cluster_id, secrets['PACKET_PROJECT_ID'], dir_path, state_path,98 project_deleted)99 return ({'cluster_id': cluster_id, 'task_status': celery_task.status,100 'task_id': celery_task.id})101def do_create_stack(secrets):102 account_endpoint = re.search("(?:http.*://)?(?P<host>[^:/ ]+)", secrets['OS_AUTH_URL']).group('host')103 conn = openstack.connect(cloud='cloud')104 os_project = create_project(conn, secrets['PACKET_PROJECT_ID'], secrets['OS_USERNAME'])105 cluster_id, node_pool_uuid = create_cluster(account_endpoint, secrets['OS_USERNAME'], secrets['OS_PASSWORD'],106 os_project['name'], secrets['OS_REGION_NAME'], secrets['CLUSTER_NAME'],107 secrets['R53_ZONE_NAME'][:-1])108 user_email = "admin@{}.{}.tikube".format(cluster_id, os_project['name'])109 os_user, user_password = create_user(conn, os_project, user_email)110 dir_path = "{}/{}".format(os.path.dirname(os.path.realpath(__file__)), "terraform")111 state_path = "{}/states/{}/{}".format(dir_path, os_project['name'], cluster_id)112 os.makedirs(state_path, exist_ok=True)113 with open("{}/admin_creds.json".format(state_path), 'w') as outfile:114 json.dump({"username": os_user['name'], "password": user_password}, outfile)115 # tags = ["cluster_name={}".format(secrets['CLUSTER_NAME']), "cluster_id={}".format(cluster_id)]116 tf_vars = {117 'auth_token': secrets['AUTH_TOKEN'],118 'project_id': os_project['name'],119 'master_size': secrets['MASTER_SIZE'],120 'worker_size': secrets['WORKER_SIZE'],121 'facility': secrets['FACILITY'],122 'master_count': secrets['MASTER_COUNT'],123 'worker_count': secrets['WORKER_COUNT'],124 'du_fqdn': account_endpoint,125 'keystone_user': secrets['OS_USERNAME'],126 'keystone_password': secrets['OS_PASSWORD'],127 'cluster_uuid': cluster_id,128 'node_pool_uuid': node_pool_uuid,129 'zone_name': secrets['R53_ZONE_NAME'],130 'aws_access_key': secrets['AWS_ACCESS_KEY'],131 'aws_secret_key': secrets['AWS_SECRET_KEY'],132 'aws_region': secrets['AWS_REGION']133 }134 celery_task = create_terraform_stack.delay(secrets['CLUSTER_NAME'], tf_vars, dir_path, state_path)135 return ({'cluster_id': cluster_id, 'admin': os_user['name'], 'task_status': celery_task.status,136 'task_id': celery_task.id})137def do_get_kubeconfig(secrets):138 endpoint = re.search("(?:http.*://)?(?P<host>[^:/ ]+)", secrets['OS_AUTH_URL']).group('host')139 token, catalog, project_id = qbert.get_token_v3(endpoint, secrets['OS_USERNAME'], secrets['OS_PASSWORD'],140 secrets['PACKET_PROJECT_ID'])141 qbert_url = "{0}/{1}".format(qbert.get_service_url('qbert', catalog, secrets['OS_REGION_NAME']), project_id)142 dir_path = "{}/{}".format(os.path.dirname(os.path.realpath(__file__)), "terraform")143 state_path = "{}/states/{}/{}".format(dir_path, secrets['PACKET_PROJECT_ID'], secrets['CLUSTER_ID'])144 conn = openstack.connect(cloud='cloud')145 os_user = conn.get_user(secrets['user_id'])146 if not os_user:147 return {"Error": "User id: {} not found.".format(secrets['user_id'])}148 username = os_user['name']149 if username == 'admin@{}.{}.tikube'.format(secrets['CLUSTER_ID'], secrets['PACKET_PROJECT_ID']):150 with open("{}/admin_creds.json".format(state_path)) as f:151 user_creds = json.load(f)152 password = user_creds['password']153 authorize_cluster.delay(qbert_url, token, secrets['CLUSTER_ID'], username)154 else:155 conn = openstack.connect(cloud='cloud')156 os_project = conn.get_project(secrets['PACKET_PROJECT_ID'])157 os_user, password = create_user(conn, os_project, username)158 kubeconfig = qbert.get_kube_config(qbert_url, token, endpoint, secrets['CLUSTER_ID'], secrets['PACKET_PROJECT_ID'],159 username, password)160 return kubeconfig161def do_delete_user(secrets):162 conn = openstack.connect(cloud='cloud')163 os_user = conn.get_user(secrets['user_id'])164 if not os_user:165 return {'Error': 'User with ID: {} doesn\'t exist!'.format(secrets['user_id'])}166 if os_user['name'] == "admin@{}.{}.tikube".format(secrets['CLUSTER_ID'],167 secrets['PACKET_PROJECT_ID']):168 return {'Error', 'You can not delete the admin user. It will be deleted when the cluster is deleted.'}169 delete_user(conn, secrets['user_id'])170 return {'OK': 'User: {} has been deleted.'.format(os_user['name'])}171def do_get_users(secrets, user_id=None):172 conn = openstack.connect(cloud='cloud')173 if user_id:174 os_user = conn.get_user(user_id)175 if not os_user:176 return {'Error': 'User with ID: {} doesn\'t exist!'.format(secrets['user_id'])}177 os_admin = conn.get_user(secrets['OS_USERNAME'])178 os_project = conn.get_project(secrets['PACKET_PROJECT_ID'])179 if os_project:180 params = dict(project=os_project)181 role_mappings = conn.list_role_assignments(filters=params)182 users = []183 for mapping in role_mappings:184 # TODO: This is only checking for a single admin user... Is this even needed anymore with the checks below185 # for exact syntax of username?186 if mapping['user'] != os_admin['id']:187 os_user = conn.get_user(mapping['user'])188 if os_user['name'].endswith("@{}.{}.tikube".format(secrets['CLUSTER_ID'],189 secrets['PACKET_PROJECT_ID'])):190 if os_user['name'] == "admin@{}.{}.tikube".format(secrets['CLUSTER_ID'],191 secrets['PACKET_PROJECT_ID']):192 is_admin = True193 else:194 is_admin = False195 if os_user['id'] == user_id:196 return {"id": os_user['id'], "username": os_user['name'], "is_admin": is_admin}197 users.append({"id": os_user['id'], "username": os_user['name'],198 "is_admin": is_admin})199 return users200def do_create_user(secrets):201 if not secrets['username'].endswith("@{}.{}.tikube".format(secrets['CLUSTER_ID'],202 secrets['PACKET_PROJECT_ID'])):203 return {"Error", "Username must be in format: <username>@<cluster_id>.<project_id>.tikube"}204 conn = openstack.connect(cloud='cloud')205 os_project = conn.get_project(secrets['PACKET_PROJECT_ID'])206 os_user, _ = create_user(conn, os_project, secrets['username'])...
test_sync_principal.py
Source:test_sync_principal.py
1# Copyright 2017-present Open Networking Foundation2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import json15import os16import sys17import unittest18from mock import patch, PropertyMock, ANY, MagicMock19from unit_test_common import setup_sync_unit_test20def fake_connect_openstack_admin(self, service, required_version=None):21 return MagicMock()22class TestSyncPrincipal(unittest.TestCase):23 def setUp(self):24 self.unittest_setup = setup_sync_unit_test(os.path.abspath(os.path.dirname(os.path.realpath(__file__))),25 globals(),26 [("openstack", "openstack.xproto")] )27 sys.path.append(os.path.join(os.path.abspath(os.path.dirname(os.path.realpath(__file__))), "../steps"))28 self.model_accessor = self.unittest_setup["model_accessor"]29 from sync_principal import SyncPrincipal30 self.step_class = SyncPrincipal31 self.service = OpenStackService()32 self.trust_domain = TrustDomain(owner=self.service, name="test-trust")33 def tearDown(self):34 sys.path = self.unittest_setup["sys_path_save"]35 def test_sync_record_create_noexist(self):36 fakeconn = MagicMock()37 with patch.object(self.step_class, "connect_openstack_admin") as fake_connect_openstack_admin:38 fake_connect_openstack_admin.return_value = fakeconn39 trust_domain_id = 567840 xos_principal = Principal(name="test-principal", trust_domain=self.trust_domain)41 step = self.step_class(model_accessor=self.model_accessor)42 fakeconn.identity.find_user.return_value = None43 fakeconn.identity.find_domain.return_value = MagicMock(id=trust_domain_id)44 os_user = MagicMock()45 os_user.id = "1234"46 fakeconn.identity.create_user.return_value = os_user47 step.sync_record(xos_principal)48 fakeconn.identity.create_user.assert_called_with(name=xos_principal.name, domain_id=trust_domain_id)49 self.assertEqual(xos_principal.backend_handle, "1234")50 def test_sync_record_create_exists(self):51 fakeconn = MagicMock()52 with patch.object(self.step_class, "connect_openstack_admin") as fake_connect_openstack_admin:53 fake_connect_openstack_admin.return_value = fakeconn54 xos_principal = Principal(name="test-principal", trust_domain=self.trust_domain)55 os_user = MagicMock()56 os_user.id = "1234"57 step = self.step_class(model_accessor=self.model_accessor)58 fakeconn.identity.find_user.return_value = os_user59 fakeconn.identity.create_user.return_value = None60 step.sync_record(xos_principal)61 fakeconn.identity.create_user.assert_not_called()62 self.assertEqual(xos_principal.backend_handle, "1234")63 def test_delete_record(self):64 fakeconn = MagicMock()65 with patch.object(self.step_class, "connect_openstack_admin") as fake_connect_openstack_admin:66 fake_connect_openstack_admin.return_value = fakeconn67 xos_principal = Principal(name="test-principal", trust_domain=self.trust_domain)68 step = self.step_class(model_accessor=self.model_accessor)69 os_user = MagicMock()70 os_user.id = "1234"71 fakeconn.identity.find_user.return_value = os_user72 fakeconn.identity.delete_user.return_value = None73 step.delete_record(xos_principal)74 fakeconn.identity.delete_user.assert_called_with("1234")75if __name__ == '__main__':...
sync_principal.py
Source:sync_principal.py
1# Copyright 2017-present Open Networking Foundation2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from xossynchronizer.modelaccessor import TrustDomain, Principal15from newopenstacksyncstep import NewOpenStackSyncStep16from xosconfig import Config17from multistructlog import create_logger18log = create_logger(Config().get('logging'))19class SyncPrincipal(NewOpenStackSyncStep):20 provides=[Principal]21 requested_interval=022 observes=Principal23 def fetch_pending(self, deleted):24 """ Figure out which Principals are interesting to the OpenStack synchronizer. It's necessary to filter as we're25 synchronizing a core model, and we only want to synchronize trust domains that will exist within26 OpenStack.27 """28 objs = super(SyncPrincipal, self).fetch_pending(deleted)29 for obj in objs[:]:30 # If the Principal isn't in a TrustDomain, then the OpenStack synchronizer can't do anything with it31 if not obj.trust_domain:32 objs.remove(obj)33 continue34 # If the TrustDomain isn't part of the OpenStack service, then it's someone else's trust domain35 if "OpenStackService" not in obj.trust_domain.owner.leaf_model.class_names:36 objs.remove(obj)37 return objs38 def sync_record(self, principal):39 service = principal.trust_domain.owner.leaf_model40 conn = self.connect_openstack_admin(service)41 os_domain = conn.identity.find_domain(principal.trust_domain.name)42 os_user = conn.identity.find_user(principal.name, domain_id=os_domain.id)43 if (os_user):44 log.info("Principal already exists in openstack", principal=principal)45 else:46 log.info("Creating Principal", principal=principal)47 os_user = conn.identity.create_user(name=principal.name, domain_id=os_domain.id)48 if os_user.id != principal.backend_handle:49 principal.backend_handle = os_user.id50 principal.save(update_fields=["backend_handle"])51 def delete_record(self, principal):52 service = principal.trust_domain.owner.leaf_model53 conn = self.connect_openstack_admin(service)54 os_domain = conn.identity.find_domain(principal.trust_domain.name)55 os_user = conn.identity.find_user(principal.name, domain_id=os_domain.id)56 if (not os_user):57 log.info("Principal already does not exist in openstack", principal=principal)58 else:59 log.info("Deleting Principal", principal=principal, os_id=os_domain.id)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!