Best Python code snippet using localstack_python
database_connector.py
Source:database_connector.py
...13 self.c = self.conn.cursor()14 self.up_folder = None15 def close(self):16 self.conn.close()17 def execute_statement(self, statement, *args):18 self.c.execute(statement, tuple(args))19 self.conn.commit()20 def execute_query(self, statement, *args):21 self.c.execute(statement, tuple(args))22 return self.c.fetchall()23 def run_script(self, filename):24 with open(filename, 'r') as f:25 query = f.read()26 self.c.executescript(query)27 self.conn.commit()28 def create_user(self, row):29 # creates a user object from a row from users table30 if(row == None):31 return None32 return user.User(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[8])33 def get_user_by_id(self, user_id):34 row= self.execute_query('select * from users where id=?', user_id)[0]35 return self.create_user(row)36 def get_user_by_email(self, user_email):37 row = self.execute_query('select * from users where email=?', user_email)[0]38 return self.create_user(row)39 def log_user_in(self, user_email, user_pword):40 print(self.filename)41 try:42 row = self.execute_query('select password from users where email=?', user_email)[0]43 except IndexError: # User doesn't exist44 return False45 return sha256_crypt.verify(user_pword, row[0])46 def register_user(self, u):47 # Sort through data into suitable order and then add the user to the database48 self.execute_statement('insert into users values (?, ?, ?, ?, ?, ?, ?, ?, NULL )',\49 u['title'], u['fname'], u['lname'], u['email1'], datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), u['institute'], u['country'], sha256_crypt.encrypt(u['pword1']))50 user_id = self.c.lastrowid51 slide_ids = [x[0] for x in self.execute_query('SELECT id FROM slides')]52 for s in slide_ids:53 self.execute_statement('INSERT INTO permissions VALUES (?, ?)', s, user_id)54 return self.get_user_by_email(u['email1'])55 def get_slide_info_by_category(self, category, usr_id):56 results = []57 rows = self.execute_query('select * from (slides s join users u on s.uploader_id = u.id) where type=?', category)58 slides_accessed = self.execute_query('select slide_id from user_to_slides where user_id=?', usr_id)59 slide_id = [x[0] for x in slides_accessed]60 print(slide_id)61 for row in rows:62 if(row[1] == "none"):63 loc = url_for('static', filename="image/thumb1.jpg")64 else:65 if(os.path.isdir(self.up_folder + "/" + row[1])):66 if(os.path.isfile(self.up_folder + "/" + row[1] + "/thumb.jpg")):67 loc = url_for('static', filename='uploads/' + row[1] + '/thumb.jpg')68 else:69 loc = url_for('static', filename='uploads/' + row[1] + '/thumb.png')70 else:71 self.delete_slide(row)72 continue73 print(row[10])74 query = self.execute_query('SELECT user_id FROM permissions WHERE slide_id=?', row[10])75 print(query)76 if(not usr_id in [x[0] for x in query]):77 print("PERMISSION DENIED")78 continue79 anno = self.execute_query('SELECT * FROM annotations WHERE slide_id=?', row[10])80 num_annotations = len(anno)81 # name0 | location1 | type2 | case_num3 | consultant4 | clinic_details5 | prov_diag6 | dateuploaded7 | viewable8 | uploader_id9 | id10 | title11 | fname12 | lname13 | email14 | date_joined15 | institute16 | country17 | password18 | id1982 r = {"name": row[0], "type": row[2], "date_uploaded": row[7], "is_uploader": row[9] == usr_id, "has_edited": row[10] in slide_id, "uploader": row[11] + " " + row[12] + " " + row[13], "location": loc, "viewable": row[8], "slide_id" : row[10], "num_anno" : num_annotations}83 results.append(r)84 return results85 def change_account_detail(self, data, usr_id):86 form = data['detail']87 d = data['data']88 print(d)89 if(form == "name_form"):90 name = str(d).split(" ")91 title, fname, lname = str(name[0]), str(name[1]), str(" ".join(name[2:]))92 self.execute_statement('UPDATE users SET title=?, fname=?, lname=? WHERE id=?', title, fname, lname, usr_id)93 elif(form == "institute_form"):94 self.execute_statement('UPDATE users SET institute=? WHERE id=?', d, usr_id)95 elif(form == "country_form"):96 self.execute_statement('UPDATE users SET country=? WHERE id=?', d, usr_id)97 elif(form == "email_form"):98 self.execute_statement('UPDATE users SET email=? WHERE id=?', d, usr_id)99 else:100 return {"success": False}101 return {"success": True}102 def change_password(self, data, usr_id):103 user_pword = self.execute_query('select password from users where id=?', usr_id)[0]104 if(sha256_crypt.verify(str(data['old_password']), user_pword[0])):105 # Correct password106 self.execute_statement('UPDATE users SET password=? WHERE id=?', sha256_crypt.encrypt(data['new_password']), usr_id)107 return {"success": True}108 return {"success": False}109 def check_folder(self, dname, folder): # Checks the file doesn't exist, and if does, returns a suitable name110 fname = dname111 i = 0112 while (os.path.isdir(os.path.join(folder, dname))):113 print("Current name, changing")114 fname = dname + "_" + str(i)115 i += 1116 return fname117 def check_file_name(self, name):118 current_names = [x[0] for x in self.execute_query('select name from slides')]119 fname = name120 i = 0121 while(fname in current_names):122 fname = name + "_" + str(i)123 i += 1124 return fname125 def add_new_slide(self, form, folder, user_id):126 print(form.data['name'])127 file = form.u_file.data128 filename, ext = file.filename.rsplit('.')129 s_name = self.check_file_name(form.data['name'])130 # Create a nice random folder name131 h = hashlib.md5()132 now_date = datetime.datetime.now().__str__()133 h.update(str.encode(filename + now_date))134 folder_name = self.check_folder(h.hexdigest()[:10], folder)135 print(folder_name)136 os.makedirs(os.path.join(folder, folder_name))137 file.save(os.path.join(os.path.join(folder, folder_name), filename + "." + ext))138 data = form.data139 self.execute_statement('INSERT INTO slides VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL)', s_name, folder_name, data['type'], data['case_num'], data['consultant'], "", "", now_date, 0, user_id)140 slide_id = self.c.lastrowid141 self.execute_statement('INSERT INTO permissions VALUES (?, ?)', slide_id, user_id)142 return [{"success": True, "s_name": s_name}, SlideInfo.SlideInfo(os.path.join(folder, folder_name), file.filename, s_name)]143 def delete_slide(self, r):144 print("Couldn't find folder, therefore deleting slide " + r[0] + ", in location " + r[1])145 self.execute_statement('DELETE FROM slides WHERE name=? and location=?', r[0], r[1])146 def get_slide_data(self, name, user_id):147 query = self.execute_query('SELECT * FROM slides WHERE name=?', name)[0] # Should be only one result148 permission = self.execute_query('SELECT user_id FROM permissions WHERE slide_id=?', query[-1])149 if (user_id in [x[0] for x in permission]):150 print("PERMISSION DENIED")151 return query152 else:153 return None154 def get_slide_data_by_id(self, slide_id, user_id):155 query = self.execute_query('SELECT * FROM slides WHERE id=?', slide_id)[0]156 permission = self.execute_query('SELECT user_id FROM permissions WHERE slide_id=?', slide_id)157 if (user_id in [x[0] for x in permission]):158 #print("PERMISSION DENIED")159 return query160 else:161 return None162 def add_new_annotation(self, data, user_id):163 temp = [] # Turn the points into a list of points164 temp = ""165 for a in data['points']:166 sub = ""167 for b in a:168 for c in b:169 sub += (str(c)) + ","170 temp += sub[:-1]171 temp += "|"172 points = temp[:-1]173 print(points)174 self.execute_statement('INSERT INTO annotations VALUES(?, ?, ?, ?, ?, ?, ?, NULL)', data['name'], data['anno_description'], data['colour'], points, data['image'], user_id, data['slide_id'])175 return {"success": True, 'id' : self.c.lastrowid}176 def get_annotations(self, slide_id):177 return self.execute_query('SELECT * FROM annotations WHERE slide_id=?', slide_id)178 def confirm_upload(self, s_name):179 self.execute_statement('UPDATE slides SET viewable=1 WHERE name=?', s_name)180 def update_prov_diag(self, slide_id, info):181 self.execute_statement('UPDATE slides SET prov_diag=? WHERE id=?', info, slide_id)182 return {"success" : True}183 def update_clin_details(self, slide_id, info):184 self.execute_statement('UPDATE slides SET clinic_details=? WHERE id=?', info, slide_id)185 return {"success" : True}186 def record_slide_access(self, slide_id, user_id):187 has_accessed = self.execute_query('SELECT * FROM user_to_slides WHERE (user_id=? AND slide_id=?)', user_id, slide_id)188 if(len(has_accessed) > 0): # this user has accessed this slide before189 self.execute_statement('UPDATE user_to_slides SET accessed=? WHERE (user_id=? AND slide_id=?)', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), user_id, slide_id)190 else:191 self.execute_statement('INSERT INTO user_to_slides VALUES(?, ?, ?)', user_id, slide_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))192 def get_permitted_users(self, slideid, user_id):193 query = self.execute_query('SELECT u.title, u.fname, u.lname, u.id, u.email from (permissions p join users u on u.id=p.user_id) where p.slide_id=?', slideid)194 users = []195 for row in query:196 if(user_id != row[3]):197 result = {"name" : str(row[0]) + " " + str(row[1]) + " " + str(row[2]), "email" : row[4], "id": row[3]}198 users.append(result)199 return users200 def add_new_permissions(self, email, user_id, slide_id):201 slide = [x[0] for x in self.execute_query('SELECT uploader_id FROM slides WHERE id=?', slide_id)]202 if(not user_id in slide):203 return {"success" : 0}204 user = self.execute_query('SELECT title, fname, lname, id FROM users WHERE email=?', email)205 print(user)206 if(len(user) > 0): # the user exists207 user = user[0]208 if(len(self.execute_query('SELECT * FROM permissions WHERE (user_id=? AND slide_id=?)', user[-1], slide_id)) == 0):209 self.execute_statement('INSERT INTO permissions VALUES(?, ?)', slide_id, user[-1])210 return {"name" : str(user[0]) + " " + str(user[1]) + " " + str(user[2]), "email" : email, "id" : user[-1], "success" : 1}211 else: # User already added212 return {"success" : 2}213 return {"success" : 0}214 def remove_permissions(self, user_id, p_id, slide_id):215 slide = [x[0] for x in self.execute_query('SELECT uploader_id FROM slides WHERE id=?', slide_id)]216 if (not user_id in slide): # User doesn't have permission to be changing these permissions217 print("This user doesn't have permission to edit this")218 return {"success": False}219 self.execute_statement('DELETE FROM permissions WHERE user_id=? AND slide_id=?', p_id, slide_id)220 return {"success": True}221 def get_current_slide(self, slide_id, user_id):222 permission = self.execute_query('SELECT user_id FROM permissions WHERE slide_id=?', slide_id)223 if (not user_id in [x[0] for x in permission]):224 print("PERMISSION DENIED")225 return {"success" : False}226 slide = self.execute_query('SELECT type FROM slides WHERE id=?', slide_id)227 if(len(slide) <= 0): # the slide doesn't exist228 print("Slide doesn't exist")229 return {"success" : False}230 print(slide)231 return {"category": slide[0][0], "success" : True, "slide_id" : slide_id}232 def save_feedback(self, name, description, user_id):233 self.execute_statement('INSERT INTO feedback VALUES (?, ?, ?)', name, description, user_id)234 def delete_annotation(self, anno_id, user_id):235 annotation = self.execute_query('SELECT * FROM annotations WHERE annotator_id=? AND id=?', user_id, anno_id)236 if(len(annotation) == 0):237 return {"success": False}238 self.execute_statement('DELETE FROM annotations WHERE id=?', anno_id)...
tag_dao.py
Source:tag_dao.py
1import contextlib2import sqlite33import pandas as pd4def execute_statement(loc, sql):5 with contextlib.closing(sqlite3.connect(f'{loc}/dmt_master.db')) as conn: # auto-closes6 with conn: # auto-commits7 with contextlib.closing(conn.cursor()) as cursor: # auto-closes8 cursor.execute(sql)9 return True10def get_list_of_dic(result):11 ls = []12 for item in result:13 ls.append({k: item[k] for k in item.keys()})14 return ls15def execute_statement_return(loc, sql):16 with contextlib.closing(sqlite3.connect(f'{loc}/dmt_master.db')) as conn: # auto-closes17 conn.row_factory = sqlite3.Row18 with conn: # auto-commits19 with contextlib.closing(conn.cursor()) as cursor: # auto-closes20 cursor.execute(sql)21 result = cursor.fetchall()22 return get_list_of_dic(result)23def execute_statement_return_lot(loc, sql):24 with contextlib.closing(sqlite3.connect(f'{loc}/dmt_master.db')) as conn: # auto-closes25 with conn: # auto-commits26 with contextlib.closing(conn.cursor()) as cursor: # auto-closes27 cursor.execute(sql)28 return cursor.fetchall()29def execute_statement_return_single(loc, sql):30 with contextlib.closing(sqlite3.connect(f'{loc}/dmt_master.db')) as conn: # auto-closes31 with conn: # auto-commits32 with contextlib.closing(conn.cursor()) as cursor: # auto-closes33 cursor.execute(sql)34 return cursor.fetchone()35def executemany_statement(loc, sql, ls):36 with contextlib.closing(sqlite3.connect(f'{loc}/dmt_master.db')) as conn: # auto-closes37 with conn: # auto-commits38 with contextlib.closing(conn.cursor()) as cursor: # auto-closes39 cursor.executemany(sql, ls)40 return True41def create_tb_tag_ct(loc, tb_name):42 sql = f"""CREATE TABLE IF NOT EXISTS {tb_name} (43 id text NOT NULL PRIMARY KEY,44 tag text,45 ct text46 )"""47 execute_statement(loc, sql)48def create_tb_tag_master(loc, tb_name):49 sql = f"""CREATE TABLE IF NOT EXISTS {tb_name} (50 tag text NOT NULL PRIMARY KEY,51 map_tag text,52 rendered text,53 file_name text,54 prod_name text,55 ct text,56 file_size real default 057 )"""58 execute_statement(loc, sql)59def create_tb_prod_proc(loc, tb_name):60 sql = f"""CREATE TABLE IF NOT EXISTS {tb_name} (61 prod_name text NOT NULL PRIMARY KEY,62 ct text,63 sqltime TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL64 )"""65 execute_statement(loc, sql)66def create_tb_dm_sheet(loc, tb_name):67 sql = f"""CREATE TABLE IF NOT EXISTS {tb_name} (68 pat text NOT NULL PRIMARY KEY,69 comp text,70 styling text,71 feat text,72 comm text,73 map_type text,74 ct text,75 is_proc int76 )"""77 execute_statement(loc, sql)78def create_tb_phase(loc, tb_name):79 sql = f"""CREATE TABLE IF NOT EXISTS {tb_name} (80 feat text NOT NULL PRIMARY KEY,81 alpha1 text,82 alpha2 text,83 filter_on_col text,84 ct text85 )"""86 execute_statement(loc, sql)87def insert(loc, tb_name, col_no, ls):88 val = ','.join(['?'] * col_no)89 sql = f'INSERT INTO {tb_name} VALUES({val})'90 executemany_statement(loc, sql, ls)91def update(loc, tb_name, ls):92 sql = f'UPDATE {tb_name} set is_proc=1 where pat=?'93 executemany_statement(loc, sql, ls)94def insert_ignore(loc, tb_name, col_no, ls):95 val = ','.join(['?'] * col_no)96 sql = f'INSERT OR IGNORE INTO {tb_name} VALUES({val})'97 executemany_statement(loc, sql, ls)98def insert_prod_proc(loc, ls):99 sql = f'INSERT INTO tb_prod_proc (prod_name,ct) VALUES(?,?)'100 executemany_statement(loc, sql, ls)101def check_if_record_exist(loc, prod_name):102 sql = f'''103 SELECT EXISTS(SELECT 1 FROM tb_prod_proc WHERE prod_name="{prod_name}");104 '''105 return execute_statement_return_single(loc, sql)106def merge(loc, t1, t2, col_to_comp):107 sql = f"""108 INSERT INTO {t1} 109 SELECT * FROM {t2} A 110 WHERE NOT EXISTS (SELECT 1 FROM {t1} X WHERE A.{col_to_comp} = X.{col_to_comp})111 """112 execute_statement(loc, sql)113def delete_prod(loc, ls_prod):114 sql = f"delete from tb_tag_master where prod_name=?"115 executemany_statement(loc, sql, ls_prod)116 sql = f"delete from tb_prod_proc where prod_name=?"117 return executemany_statement(loc, sql, ls_prod)118def delete_ct(loc, ls_ct):119 sql = f"delete from tb_tag_master where ct=?"120 executemany_statement(loc, sql, ls_ct)121 sql = f"delete from tb_prod_proc where ct=?"122 executemany_statement(loc, sql, ls_ct)123 sql = f"delete from tb_tag_ct where ct=?"124 return executemany_statement(loc, sql, ls_ct)125def drop_tb(loc, tb_name):126 sql = f"drop table {tb_name}"127 return execute_statement(loc, sql)128def remove_file(loc, tb_name, file_name):129 sql = f"delete from {tb_name} where file_name='{file_name}'"130 return execute_statement(loc, sql)131def get_selected_file_name(loc):132 sql = '''SELECT distinct file_name, prod_name from tb_tag_master'''133 return execute_statement_return(loc, sql)134def export_tb_tag_master(loc, ct, tb_name):135 sql = f'''SELECT * from {tb_name}'''136 result = execute_statement_return(loc, sql)137 df = pd.DataFrame(result)138 df.to_excel(f'{loc}/{ct}/excel/tag_master_{ct}.xlsx', index=False)139def get_tag_map(loc):140 sql = f'''SELECT tag,map_tag from tb_tag_master'''141 return execute_statement_return(loc, sql)142def get_csfc(loc):143 sql = f'''SELECT * from tb_dm_sheet where is_proc=0 order by map_type desc'''144 return execute_statement_return_lot(loc, sql)...
aurora_dal.py
Source:aurora_dal.py
...38 resourceArn=db_cluster_arn,39 transactionId=transactionId)40 return transaction_response["transactionStatus"]41@timeit42def execute_statement(sql, sql_parameters=[],transactionId=None):43 parameters = {44 'secretArn': db_credentials_secrets_store_arn,45 'database': database_name,46 'resourceArn': db_cluster_arn,47 'sql': sql,48 'parameters': sql_parameters49 }50 if transactionId is not None:51 parameters['transactionId'] = transactionId52 response = rds_client.execute_statement(**parameters)53 return response54# Formatting query returned Field55def formatField(field):56 fieldName = list(field.keys())[0]57 if fieldName == 'isNull':58 return None59 return list(field.values())[0]60# Formatting query returned Record61def formatRecord(record):62 return [formatField(field) for field in record]63# Formatting query returned Field64def formatRecords(records):65 return [formatRecord(record) for record in records]66@timeit67def execute_get_query(sql, sql_parameters):68 print('===== Example - Simple query =====')69 response = execute_statement(sql, sql_parameters)70 print(response)71 72 return formatRecords(response['records'])73def insert_bot_request(bot_request_id, txId):74 sql= 'insert into bot_request (bot_request_id,conv_start_time) values (:bot_request_id,now())'75 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}}]76 execute_statement(sql, entry, txId)77def insert_bot_request_steps(bot_request_id, stepname, s3path, txId):78 sql= 'insert into bot_request_steps (bot_request_id,step_name,s3url, start_time) values (:bot_request_id,:step_name,:s3url,now())'79 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}},80 {'name':'step_name','value':{'stringValue':f'{stepname}'}},81 {'name':'s3url','value':{'stringValue':f'{s3path}'}}]82 execute_statement(sql, entry, txId)83def update_bot_request_steps(bot_request_id, translate_text, step_name, txId):84 sql= 'update bot_request_steps set end_time=now(), transcript_text=:transcript_text where step_name=:step_name and bot_request_id=:bot_request_id'85 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}},86 {'name':'transcript_text','value':{'stringValue':f'{translate_text}'}},87 {'name':'step_name','value':{'stringValue':f'{step_name}'}}]88 execute_statement(sql, entry, txId)89def update_bot_request_language(bot_request_id,language,txId):90 sql= 'update bot_request set language_code=:language where bot_request_id=:bot_request_id'91 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}},92 {'name':'language','value':{'stringValue':f'{language}'}}] 93 execute_statement(sql, entry, txId)94def update_bot_request_city(bot_request_id,city,txId):95 sql= 'update bot_request set city_name=:city where bot_request_id=:bot_request_id'96 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}},97 {'name':'city','value':{'stringValue':f'{city}'}}] 98 execute_statement(sql, entry, txId)99def update_bot_request_resort(bot_request_id,resortname,txId):100 sql= 'update bot_request set resort_name=:resort_name where bot_request_id=:bot_request_id'101 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}},102 {'name':'resort_name','value':{'stringValue':f'{resortname}'}}] 103 execute_statement(sql, entry, txId)104def insert_bot_request_lang_detection(bot_request_id, language, txId):105 sql= 'insert into bot_request_lang_detection (bot_request_id,language,start_time) values (:bot_request_id,:language,now())'106 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}},107 {'name':'language','value':{'stringValue':f'{language}'}}]108 execute_statement(sql, entry, txId)109def getBotLanguage(bot_request_id):110 print('getting bot')111 sql= 'select language_code from bot_request where bot_request_id=:bot_request_id'112 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}}]113 response_records = execute_get_query(sql, entry)114 print(response_records)115 for record in response_records:116 return record[0]117 return None118def getPendingLanguageDetectionJobs(bot_request_id):119 sql= 'select count(lang_detection_id) from bot_request_lang_detection where end_time is null and bot_request_id=:bot_request_id'120 entry = [{'name':'bot_request_id','value':{'longValue':bot_request_id}}]121 response_records = execute_get_query(sql, entry)122 print(response_records)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!