Best Python code snippet using tempest_python
database.py
Source:database.py
1import argparse2import asyncio3import os4import sqlite35import yaml6from aiohttp import web7from prettytable import PrettyTable8from constants import DATABASE_NAME9parser = argparse.ArgumentParser(description="Cluster Launcher User Manager - A system to view, edit and manage the user database associated with the Cluster Launcher.")10subparsers = parser.add_subparsers(dest='subparser')11#Parser for User interactions12parser_user = subparsers.add_parser('user', help="Manipulate a User in the User Table")13user_subparsers = parser_user.add_subparsers(dest='options')14#Add Users to the User Table15parser_add_user = user_subparsers.add_parser('add', help="Add a User in the User Table")16parser_add_user.add_argument('user', nargs=1, help="Username of the user to add")17#Remove Users from the User Table18parser_remove_user = user_subparsers.add_parser('remove', help="Remove a User in the User Table")19parser_remove_user.add_argument('user', nargs=1, help="Username of the user to add")20#Parser for the Volume Table21parser_volume = subparsers.add_parser('volume', help="Manipulate a Volume in the Volume Table")22volume_subparsers = parser_volume.add_subparsers(dest='options')23#Add Volume to Volume Table24parser_add_volume = volume_subparsers.add_parser('add')25parser_add_volume.add_argument('user', nargs=1)26parser_add_volume.add_argument('tenant_name', nargs=1)27parser_add_volume.add_argument('volume_name', nargs=1)28parser_add_volume.add_argument('-volSize', nargs=1)29#Remove Volume to Volume Table30parser_remove_volume = volume_subparsers.add_parser('remove')31parser_remove_volume.add_argument('user', nargs=1)32parser_remove_volume.add_argument('tenant_name', nargs=1)33#Parser for Tenant Interactions34parser_tenant = subparsers.add_parser('tenant', help="Manipulate a Tenant in the Tenant Table")35tenant_subparsers = parser_tenant.add_subparsers(dest='options')36#Populate the Tenant Table37parser_add_tenant = tenant_subparsers.add_parser('populate', help="Populate the Tenant's Table from tenants_conf.yml")38parser_remove_tenant = tenant_subparsers.add_parser('depopulate', help="Remove all rows in the Tenant's Table")39parser_search_tenant = tenant_subparsers.add_parser('search', help="Find a Tenant's ID")40parser_search_tenant.add_argument('tenant_name', nargs=1, help="Name of Tenant")41#Add users to the Tenant Table42parser_associate_user = subparsers.add_parser('link', help="Add a User to the Tenant's Database")43parser_associate_user.add_argument('user', nargs=1, help="Username of the user to add")44parser_associate_user.add_argument('tenant_name', nargs=1, help="Name of the tenant to add the user to")45#Remove users from the tenants database46parser_deassociate_user = subparsers.add_parser('delink', help="Remove a User from the Tenant's Database.")47parser_deassociate_user.add_argument('user', help="Username of the user to remove")48parser_deassociate_user.add_argument('tenant_name', help="Name of the tenant to remove the user from")49parser_cluster = subparsers.add_parser('cluster', help="Manage a Cluster in the database in the event of errors")50cluster_subparsers = parser_cluster.add_subparsers(dest='options')51parser_remove_cluster = cluster_subparsers.add_parser('remove', help="Remove a Cluster from the table")52parser_remove_cluster.add_argument('user', help="Username of the user to remove")53parser_network = subparsers.add_parser('network', help="Manage a Network in the database in the event of errors")54network_subparsers = parser_network.add_subparsers(dest='options')55parser_remove_network = network_subparsers.add_parser('remove', help="Remove a Network from the table")56parser_remove_network.add_argument('user', help="Username of the user to remove")57parser_remove_network.add_argument('tenant_name', help="Name of the tenant")58parser_search = subparsers.add_parser('search', help="Search to find a user-tenant mapping")59parser_search.add_argument('user', help="Username of the user to remove")60parser_search.add_argument('tenant_name', help="ID of the tenant to remove the user from")61#Parser for outputting information to users62parser_list = subparsers.add_parser('list', help="List information stored in tables")63list_subparsers = parser_list.add_subparsers(dest='options')64#Output tenant table information65parser_list_tenants = list_subparsers.add_parser('tenant')66#Output user table information67parser_list_users = list_subparsers.add_parser('user')68#Output mapping table information69parser_list_mappings = list_subparsers.add_parser('mapping')70#Output clusters table information71clusters_list_mappings = list_subparsers.add_parser('clusters')72#Output networks table information73networks_list_mappings = list_subparsers.add_parser('networks')74#Output volumes table information75volumes_list_mappings = list_subparsers.add_parser('volumes')76def initialise_database():77 if not os.path.exists(DATABASE_NAME):78 with open(DATABASE_NAME, 'w'): pass79 #Initialise the database by creating the SQL tables if not present already80 db = sqlite3.connect(DATABASE_NAME)81 cursor = db.cursor()82 cursor.execute('''CREATE TABLE IF NOT EXISTS users(83 username TEXT PRIMARY KEY NOT NULL)84 ''')85 cursor.execute('''CREATE TABLE IF NOT EXISTS tenants(86 tenants_name TEXT PRIMARY KEY NOT NULL,87 tenants_id TEXT NOT NULL,88 lustre_description TEXT NOT NULL)89 ''')90 cursor.execute('''CREATE TABLE IF NOT EXISTS user_tenant_mapping(91 username TEXT NOT NULL REFERENCES users(username),92 tenant_name TEXT NOT NULL REFERENCES tenants(tenants_name),93 PRIMARY KEY (username, tenant_name))94 ''')95 cursor.execute('''CREATE TABLE IF NOT EXISTS volumes(96 username TEXT NOT NULL REFERENCES users(username),97 tenant_name TEXT NOT NULL REFERENCES tenants(tenants_name),98 volume_name TEXT NOT NULL,99 volume_size INTEGER,100 PRIMARY KEY (username, tenant_name))101 ''')102 cursor.execute('''CREATE TABLE IF NOT EXISTS clusters(103 username TEXT PRIMARY KEY NOT NULL,104 tenant TEXT NOT NULL,105 cluster_ip TEXT NOT NULL,106 num_workers TEXT NOT NULL)107 ''')108 cursor.execute('''109 CREATE TABLE IF NOT EXISTS networking(110 user_name TEXT NOT NULL,111 network_id TEXT NOT NULL,112 subnet_id TEXT NOT NULL,113 router_id TEXT NOT NULL,114 tenant_name TEXT NOT NULL,115 PRIMARY KEY (user_name, tenant_name)116 )117 ''')118 db.commit()119 return db, cursor120def add_user(user):121 db, cursor = initialise_database()122 try:123 cursor.execute("INSERT INTO users (username) VALUES (?)",(user))124 db.commit()125 except sqlite3.IntegrityError:126 print("Username is already registered in the Database")127 db.close()128def remove_user(user):129 db, cursor = initialise_database()130 cursor.execute("DELETE from users where username = ?", (user))131 db.commit()132 db.close()133def add_volume(user, tenant_name, volume_name):134 db, cursor = initialise_database()135 try:136 cursor.execute("INSERT INTO volumes (username, tenant_name, volume_name) VALUES (?,?,?)",(user,tenant_name,volume_name))137 db.commit()138 except sqlite3.IntegrityError:139 print("Username already has a volume registered to that tenant")140 db.close()141def remove_volume(user, tenant_name):142 db, cursor = initialise_database()143 cursor.execute("DELETE from volumes where username = ? AND tenant_name = ?", (user, tenant_name))144 db.commit()145 db.close()146def add_cluster(user, tenant_name, cluster_ip, num_workers):147 db, cursor = initialise_database()148 try:149 cursor.execute('''INSERT INTO clusters (username, tenant, cluster_ip, num_workers)150 VALUES (?, ?, ?, ?)''', (user, tenant_name, cluster_ip, num_workers))151 except sqlite3.IntegrityError:152 print("This user already has a cluster registered. An error has occured")153 db.commit()154 db.close()155def remove_cluster(user):156 db, cursor = initialise_database()157 cursor.execute("DELETE from clusters where username = ?", (user,))158 db.commit()159 db.close()160def add_network(username, network_id, subnet_id, router_id, tenant_name):161 db, cursor = initialise_database()162 cursor.execute('''INSERT INTO163 networking(user_name, network_id, subnet_id, router_id, tenant_name)164 VALUES(?, ?, ?, ?, ?)''',165 (username, network_id, subnet_id, router_id, tenant_name))166 db.commit()167 db.close()168def remove_network(user, tenant_name):169 db, cursor = initialise_database()170 cursor.execute("DELETE from networking where user_name = ? AND tenant_name = ?", (user, tenant_name))171 db.commit()172 db.close()173def populate_tenants():174 db, cursor = initialise_database()175 with open('tenants_conf.yml', 'r') as tenant_file:176 data = yaml.load(tenant_file, Loader=yaml.Loader)177 for tenant in data['tenants']:178 id = data['tenants'][tenant].get('id')179 lustre_desc = data['tenants'][tenant].get('network')180 try:181 cursor.execute("INSERT INTO tenants (tenants_name, tenants_id, lustre_description) VALUES (?, ?, ?)",(tenant, id, lustre_desc))182 db.commit()183 except sqlite3.IntegrityError:184 pass185 db.close()186def depopulate_tenants():187 db, cursor = initialise_database()188 cursor.execute("DELETE FROM tenants")189 print(str(cursor.rowcount) + " rows have been deleted from the tenant table")190 db.commit()191 db.close()192def link_tables(user, tenant):193 db, cursor = initialise_database()194 cursor.execute('pragma foreign_keys=ON')195 tenant = tenant.lower()196 cursor.execute("INSERT INTO user_tenant_mapping (username, tenant_name) VALUES (?, ?)",(user, tenant))197 db.commit()198 db.close()199def delink_tables(user, tenant):200 db, cursor = initialise_database()201 tenant = tenant.lower()202 cursor.execute("DELETE from user_tenant_mapping where username = ? AND tenant_name = ?;", (user, tenant))203 db.commit()204 db.close()205def display_tenants():206 db, cursor = initialise_database()207 cursor.execute("SELECT * from tenants ORDER BY tenants_name")208 results = cursor.fetchall()209 table = PrettyTable()210 table.field_names = ["Tenant Name", "Tenant ID", "Lustre Description"]211 table.align["Tenant Name"] = "l"212 for tenant in results:213 row = [str(tenant[0]), str(tenant[1]), str(tenant[2])]214 table.add_row(row)215 print(table)216 db.close()217def display_users():218 db, cursor = initialise_database()219 cursor.execute("SELECT * from users ORDER BY username")220 results = cursor.fetchall()221 table = PrettyTable()222 table.field_names = ["Username"]223 for user in results:224 row = [str(user[0])]225 table.add_row(row)226 print(table)227 db.close()228def display_mapping():229 db, cursor = initialise_database()230 cursor.execute("SELECT * from user_tenant_mapping ORDER BY username")231 results = cursor.fetchall()232 table = PrettyTable()233 table.field_names = ["Username", "Tenant Name"]234 for mapping in results:235 row = [str(mapping[0]), str(mapping[1])]236 table.add_row(row)237 print(table)238 db.close()239def display_clusters():240 db, cursor = initialise_database()241 cursor.execute("SELECT * from clusters ORDER BY username")242 results = cursor.fetchall()243 table = PrettyTable()244 table.field_names = ["Username", "Tenant Name", "Cluster IP", "Number of Workers"]245 for clusters in results:246 row = [str(clusters[0]), str(clusters[1]), str(clusters[2]), str(clusters[3])]247 table.add_row(row)248 print(table)249 db.close()250def display_networks():251 db, cursor = initialise_database()252 cursor.execute("SELECT * from networking ORDER BY user_name")253 results = cursor.fetchall()254 table = PrettyTable()255 table.field_names = ["Username", "Network ID", "Subnet ID", "Router ID", "Tenant"]256 for networks in results:257 row = [str(networks[0]), str(networks[1]), str(networks[2]), str(networks[3]), str(networks[4])]258 table.add_row(row)259 print(table)260 db.close()261def display_volumes():262 db, cursor = initialise_database()263 cursor.execute("SELECT * from volumes ORDER BY username")264 results = cursor.fetchall()265 table = PrettyTable()266 table.field_names = ["Username", "Tenant Name", "Volume Name", "Volume Size"]267 for volumes in results:268 row = [str(volumes[0]), str(volumes[1]), str(volumes[2]), str(volumes[3])]269 table.add_row(row)270 print(table)271 db.close()272def request_mappings(request):273 db, cursor = initialise_database()274 cursor.execute("SELECT * from user_tenant_mapping ORDER BY username")275 results = cursor.fetchall()276 db.close()277 return web.json_response(results)278def search_user(user, tenant_name):279 db, cursor = initialise_database()280 cursor.execute("SELECT * from user_tenant_mapping WHERE username = ? AND tenant_name = ?",(user,tenant_name))281 result = cursor.fetchall()282 db.close()283 if result == []:284 return False285 else:286 return True287def tenant_finder(user):288 db, cursor = initialise_database()289 cursor.execute('''SELECT tenant FROM clusters WHERE username = ?''',290 (user,))291 tenant = cursor.fetchall()292 return tenant[0][0]293def fetch_id(tenant_name):294 db, cursor = initialise_database()295 cursor.execute("SELECT tenants_id from tenants WHERE tenants_name = ?", [tenant_name])296 tenant_id = cursor.fetchone()[0]297 return tenant_id298def lustre_status(tenant_name):299 db, cursor = initialise_database()300 cursor.execute("SELECT lustre_description from tenants WHERE tenants_name = ?",[tenant_name])301 lustre_network= cursor.fetchone()[0]302 return lustre_network303def checkVolumes(username, tenant_name):304 db, cursor = initialise_database()305 cursor.execute("SELECT volume_name from volumes WHERE username = ? AND tenant_name = ?", [username, tenant_name])306 try:307 volume_name = cursor.fetchall()[0][0]308 except Exception:309 volume_name = None310 return volume_name311def checkMappings(request):312 db, cursor = initialise_database()313 if 'X-Forwarded-User' in request.headers:314 username = request.headers['X-Forwarded-User']315 else:316 #For testing purposes317 username = "non-swarm-test"318 cursor.execute("SELECT tenant_name from user_tenant_mapping WHERE username = ?", [username])319 list_of_tenant_names = cursor.fetchall()320 array = {}321 for tenant_name in list_of_tenant_names:322 volume_name = checkVolumes(username, tenant_name[0])323 array[tenant_name[0]] = volume_name324 return web.json_response(array)325if __name__ == '__main__':326 args = parser.parse_args()327 #Manage Users Table328 if args.subparser == "user":329 #Add a user to the user's table330 if args.options == "add":331 add_user(args.user)332 #Remove a user from the user's table333 if args.options == "remove":334 remove_user(args.user)335 if args.subparser == "volume":336 if args.options == "add":337 add_volume(args.user[0], args.tenant_name[0], args.volume_name[0])338 if args.options == "remove":339 remove_volume(args.user[0], args.tenant_name[0])340 if args.subparser == "cluster":341 if args.options == "remove":342 remove_cluster(args.user)343 if args.subparser == "network":344 if args.options == "remove":345 remove_network(args.user, args.tenant_name)346 #Populate the Tenant's Table347 if args.subparser == "tenant":348 if args.options == "populate":349 populate_tenants()350 if args.options == "depopulate":351 depopulate_tenants()352 if args.options == "search":353 fetch_id(args.tenant_name[0])354 #Add links in the User-Tenant Mapping Table355 if args.subparser == "link":356 link_tables(args.user[0], args.tenant_name[0])357 #Delink in the User-Tenant Mapping Table358 if args.subparser == "delink":359 delink_tables(args.user, args.tenant_name)360 #List the tenants currently stored361 if args.subparser == "list":362 if args.options == "tenant":363 display_tenants()364 if args.options == "user":365 display_users()366 if args.options == "mapping":367 display_mapping()368 if args.options == "clusters":369 display_clusters()370 if args.options == "networks":371 display_networks()372 if args.options == "volumes":373 display_volumes()374 if args.subparser == "search":...
tenant_policy.py
Source:tenant_policy.py
1import logging2from sqlalchemy.sql import and_3from .. import exceptions, permissions, paginate4from .. import models5from ..api.permission import Permission6from ..api.tenant import Tenant7from ..models.Permission import PermissionType8logger = logging.getLogger(__name__)9TENANT_NAME = 'dataall'10class TenantPolicy:11 @staticmethod12 def is_tenant_admin(groups: [str]):13 if not groups:14 return False15 if 'DAAdministrators' in groups:16 return True17 return False18 @staticmethod19 def check_user_tenant_permission(20 session, username: str, groups: [str], tenant_name: str, permission_name: str21 ):22 if TenantPolicy.is_tenant_admin(groups):23 return True24 tenant_policy = TenantPolicy.has_user_tenant_permission(25 session=session,26 username=username,27 groups=groups,28 permission_name=permission_name,29 tenant_name=tenant_name,30 )31 if not tenant_policy:32 raise exceptions.TenantUnauthorized(33 username=username,34 action=permission_name,35 tenant_name=tenant_name,36 )37 else:38 return tenant_policy39 @staticmethod40 def has_user_tenant_permission(41 session, username: str, groups: [str], tenant_name: str, permission_name: str42 ):43 if not username or not permission_name:44 return False45 tenant_policy: models.TenantPolicy = (46 session.query(models.TenantPolicy)47 .join(48 models.TenantPolicyPermission,49 models.TenantPolicy.sid == models.TenantPolicyPermission.sid,50 )51 .join(52 models.Tenant,53 models.Tenant.tenantUri == models.TenantPolicy.tenantUri,54 )55 .join(56 models.Permission,57 models.Permission.permissionUri58 == models.TenantPolicyPermission.permissionUri,59 )60 .filter(61 models.TenantPolicy.principalId.in_(groups),62 models.Permission.name == permission_name,63 models.Tenant.name == tenant_name,64 )65 .first()66 )67 return tenant_policy68 @staticmethod69 def has_group_tenant_permission(70 session, group_uri: str, tenant_name: str, permission_name: str71 ):72 if not group_uri or not permission_name:73 return False74 tenant_policy: models.TenantPolicy = (75 session.query(models.TenantPolicy)76 .join(77 models.TenantPolicyPermission,78 models.TenantPolicy.sid == models.TenantPolicyPermission.sid,79 )80 .join(81 models.Tenant,82 models.Tenant.tenantUri == models.TenantPolicy.tenantUri,83 )84 .join(85 models.Permission,86 models.Permission.permissionUri87 == models.TenantPolicyPermission.permissionUri,88 )89 .filter(90 and_(91 models.TenantPolicy.principalId == group_uri,92 models.Permission.name == permission_name,93 models.Tenant.name == tenant_name,94 )95 )96 .first()97 )98 if not tenant_policy:99 return False100 else:101 return tenant_policy102 @staticmethod103 def find_tenant_policy(session, group_uri: str, tenant_name: str):104 TenantPolicy.validate_find_tenant_policy(group_uri, tenant_name)105 tenant_policy = (106 session.query(models.TenantPolicy)107 .join(108 models.Tenant, models.Tenant.tenantUri == models.TenantPolicy.tenantUri109 )110 .filter(111 and_(112 models.TenantPolicy.principalId == group_uri,113 models.Tenant.name == tenant_name,114 )115 )116 .first()117 )118 return tenant_policy119 @staticmethod120 def validate_find_tenant_policy(group_uri, tenant_name):121 if not group_uri:122 raise exceptions.RequiredParameter(param_name='group_uri')123 if not tenant_name:124 raise exceptions.RequiredParameter(param_name='tenant_name')125 @staticmethod126 def attach_group_tenant_policy(127 session,128 group: str,129 permissions: [str],130 tenant_name: str,131 ) -> models.TenantPolicy:132 TenantPolicy.validate_attach_tenant_policy(group, permissions, tenant_name)133 policy = TenantPolicy.save_group_tenant_policy(session, group, tenant_name)134 TenantPolicy.add_permission_to_group_tenant_policy(135 session, group, permissions, tenant_name, policy136 )137 return policy138 @staticmethod139 def validate_attach_tenant_policy(group, permissions, tenant_name):140 if not group:141 raise exceptions.RequiredParameter(param_name='group')142 if not permissions:143 raise exceptions.RequiredParameter(param_name='permissions')144 if not tenant_name:145 raise exceptions.RequiredParameter(param_name='tenant_name')146 @staticmethod147 def save_group_tenant_policy(session, group, tenant_name):148 TenantPolicy.validate_save_tenant_policy(group, tenant_name)149 policy = TenantPolicy.find_tenant_policy(session, group, tenant_name)150 if not policy:151 policy = models.TenantPolicy(152 principalId=group,153 principalType='GROUP',154 tenant=Tenant.get_tenant_by_name(session, tenant_name),155 )156 session.add(policy)157 session.commit()158 return policy159 @staticmethod160 def validate_save_tenant_policy(group, tenant_name):161 if not group:162 raise exceptions.RequiredParameter(param_name='group')163 if not tenant_name:164 raise exceptions.RequiredParameter(param_name='tenant_name')165 @staticmethod166 def add_permission_to_group_tenant_policy(167 session, group, permissions, tenant_name, policy168 ):169 TenantPolicy.validate_add_permission_to_tenant_policy_params(170 group, permissions, policy, tenant_name171 )172 for permission in permissions:173 if not TenantPolicy.has_group_tenant_permission(174 session,175 group_uri=group,176 permission_name=permission,177 tenant_name=tenant_name,178 ):179 TenantPolicy.associate_permission_to_tenant_policy(180 session, policy, permission181 )182 @staticmethod183 def validate_add_permission_to_tenant_policy_params(184 group, permissions, policy, tenant_name185 ):186 if not group:187 raise exceptions.RequiredParameter(param_name='group')188 TenantPolicy.validate_add_permissions_params(permissions, policy, tenant_name)189 @staticmethod190 def validate_add_permissions_params(permissions, policy, tenant_name):191 if not permissions:192 raise exceptions.RequiredParameter(param_name='permissions')193 if not tenant_name:194 raise exceptions.RequiredParameter(param_name='tenant_name')195 if not policy:196 raise exceptions.RequiredParameter(param_name='policy')197 @staticmethod198 def associate_permission_to_tenant_policy(session, policy, permission):199 policy_permission = models.TenantPolicyPermission(200 sid=policy.sid,201 permissionUri=Permission.get_permission_by_name(202 session, permission, PermissionType.TENANT.name203 ).permissionUri,204 )205 session.add(policy_permission)206 session.commit()207 @staticmethod208 def get_tenant_policy_permissions(session, group_uri, tenant_name):209 if not group_uri:210 raise exceptions.RequiredParameter(param_name='group_uri')211 if not tenant_name:212 raise exceptions.RequiredParameter(param_name='tenant_name')213 policy = TenantPolicy.find_tenant_policy(214 session=session,215 group_uri=group_uri,216 tenant_name=tenant_name,217 )218 permissions = []219 for p in policy.permissions:220 permissions.append(p.permission)221 return permissions222 @staticmethod223 def delete_tenant_policy(224 session,225 group: str,226 tenant_name: str,227 ) -> bool:228 policy = TenantPolicy.find_tenant_policy(229 session, group_uri=group, tenant_name=tenant_name230 )231 if policy:232 for permission in policy.permissions:233 session.delete(permission)234 session.delete(policy)235 session.commit()236 return True237 @staticmethod238 def list_group_tenant_permissions(239 session, username, groups, uri, data=None, check_perm=None240 ):241 if not groups:242 raise exceptions.RequiredParameter('groups')243 if not uri:244 raise exceptions.RequiredParameter('groupUri')245 if not TenantPolicy.is_tenant_admin(groups):246 raise exceptions.UnauthorizedOperation(247 action='LIST_TENANT_TEAM_PERMISSIONS',248 message=f'User: {username} is not allowed to manage tenant permissions',249 )250 return TenantPolicy.get_tenant_policy_permissions(251 session=session,252 group_uri=uri,253 tenant_name='dataall',254 )255 @staticmethod256 def list_tenant_groups(session, username, groups, uri, data=None, check_perm=None):257 if not groups:258 raise exceptions.RequiredParameter('groups')259 if not TenantPolicy.is_tenant_admin(groups):260 raise exceptions.UnauthorizedOperation(261 action='LIST_TENANT_TEAMS',262 message=f'User: {username} is not allowed to manage tenant permissions',263 )264 query = session.query(265 models.TenantPolicy.principalId.label('name'),266 models.TenantPolicy.principalId.label('groupUri'),267 ).filter(268 and_(269 models.TenantPolicy.principalType == 'GROUP',270 models.TenantPolicy.principalId != 'DAAdministrators',271 )272 )273 if data and data.get('term'):274 query = query.filter(275 models.TenantPolicy.principalId.ilike('%' + data.get('term') + '%')276 )277 return paginate(278 query=query,279 page=data.get('page', 1),280 page_size=data.get('pageSize', 10),281 ).to_dict()282 @staticmethod283 def list_tenant_permissions(session, username, groups):284 if not TenantPolicy.is_tenant_admin(groups):285 raise exceptions.UnauthorizedOperation(286 action='LIST_TENANT_TEAM_PERMISSIONS',287 message=f'User: {username} is not allowed to manage tenant permissions',288 )289 group_invitation_permissions = []290 for p in permissions.TENANT_ALL:291 group_invitation_permissions.append(292 Permission.find_permission_by_name(293 session=session,294 permission_name=p,295 permission_type=PermissionType.TENANT.name,296 )297 )298 return group_invitation_permissions299 @staticmethod300 def update_group_permissions(301 session, username, groups, uri, data=None, check_perm=None302 ):303 TenantPolicy.validate_params(data)304 if not TenantPolicy.is_tenant_admin(groups):305 exceptions.UnauthorizedOperation(306 action='UPDATE_TENANT_TEAM_PERMISSIONS',307 message=f'User: {username} is not allowed to manage tenant permissions',308 )309 TenantPolicy.validate_permissions(310 session, TENANT_NAME, data['permissions'], uri311 )312 TenantPolicy.delete_tenant_policy(313 session=session, group=uri, tenant_name=TENANT_NAME314 )315 TenantPolicy.attach_group_tenant_policy(316 session=session,317 group=uri,318 permissions=data['permissions'],319 tenant_name=TENANT_NAME,320 )321 return True322 @staticmethod323 def validate_permissions(session, tenant_name, g_permissions, group):324 g_permissions = list(set(g_permissions))325 if g_permissions not in permissions.TENANT_ALL:326 exceptions.TenantPermissionUnauthorized(327 action='UPDATE_TENANT_TEAM_PERMISSIONS',328 group_name=group,329 tenant_name=tenant_name,330 )331 tenant_group_permissions = []332 for p in g_permissions:333 tenant_group_permissions.append(334 Permission.find_permission_by_name(335 session=session,336 permission_name=p,337 permission_type=PermissionType.TENANT.name,338 )339 )340 return tenant_group_permissions341 @staticmethod342 def validate_params(data):343 if not data:344 raise exceptions.RequiredParameter('data')345 if not data.get('permissions'):...
multi_db_management.py
Source:multi_db_management.py
1from models import Tenant2from models import db3from caching import simple_cache4from flask import current_app5MYSQL_URI = 'mysql+pymysql://user:pwd@localhost/{}?charset=utf8'6@simple_cache7def get_known_tenants():8 tenants = Tenant.query.all()9 return [i.name for i in tenants]10def prepare_bind(tenant_name):11 if tenant_name not in current_app.config['SQLALCHEMY_BINDS']:12 current_app.config['SQLALCHEMY_BINDS'][tenant_name] = MYSQL_URI.format(tenant_name)13 return current_app.config['SQLALCHEMY_BINDS'][tenant_name]14def get_tenant_session(tenant_name):15 if tenant_name not in get_known_tenants():16 return None17 prepare_bind(tenant_name)18 engine = db.get_engine(current_app, bind=tenant_name)19 session_maker = db.sessionmaker()20 session_maker.configure(bind=engine)21 session = session_maker()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!