Best Python code snippet using dbt-osmosis_python
parse_sql_tests.py
Source:parse_sql_tests.py
1import sys2from os import path3sys.path.append(path.dirname(path.dirname(path.abspath(__file__))) + '/code')4from unittest import TestCase, main5from parse_sql import WithStatementParser6class ParseSqlTests(TestCase):7 def test_helper_cleaning_methods(self):8 sql_parser = WithStatementParser('./test_data/example_1.sql')9 input_text = ' extra\n space everywhere '10 output_text = 'extra space everywhere'11 self.assertEqual(sql_parser.remove_excess_whitespace(input_text), output_text)12 input_text = '-- comment\nselect * from table;'13 output_text = '\nselect * from table;'14 self.assertEqual(sql_parser.remove_comments(input_text), output_text)15 input_text = 'select * -- inline comment\nfrom table;'16 output_text = 'select * \nfrom table;'17 self.assertEqual(sql_parser.remove_comments(input_text), output_text)18 input_text = 'select * from table1, table2;'19 output_text = 'select * from table1 cross join table2;'20 self.assertEqual(sql_parser.cross_joins(input_text), output_text)21 input_text = 'select * from table1 t1, table2 t2;'22 output_text = 'select * from table1 t1 cross join table2 t2;'23 self.assertEqual(sql_parser.cross_joins(input_text), output_text)24 input_text = 'select * from table1 as t1, table2 as t2;'25 output_text = 'select * from table1 as t1 cross join table2 as t2;'26 self.assertEqual(sql_parser.cross_joins(input_text), output_text)27 input_text = 'select * from table1, table2, table3;'28 output_text = 'select * from table1 cross join table2 cross join table3;'29 self.assertEqual(sql_parser.cross_joins(input_text), output_text)30 output_text = 'my_table_1 as (\nselect *\nfrom table_1\n)\nselect *\nfrom my_table_1\n;'31 self.assertEqual(sql_parser.clean_original_query(), output_text)32 def test_parenthesis_tracking(self):33 sql_parser = WithStatementParser('./test_data/example_1.sql')34 self.assertFalse(sql_parser.end_with_statement)35 sql_parser.parenthesis_tracking('(')36 self.assertEqual(sql_parser.num_open_parens, 1)37 self.assertFalse(sql_parser.end_with_statement)38 sql_parser.parenthesis_tracking(')')39 self.assertEqual(sql_parser.num_close_parens, 1)40 self.assertTrue(sql_parser.end_with_statement)41 def test_extract_with_statements(self):42 sql_parser = WithStatementParser('./test_data/example_2.sql')43 sql_parser.extract_with_statements()44 self.assertEqual(sql_parser.overall_name, 'my_table_1_my_table_2')45 self.assertEqual(sql_parser.build_order, ['my_table_1', 'my_table_2', 'my_table_1_my_table_2'])46 self.assertEqual(sorted(sql_parser.with_statements.keys()), ['my_table_1', 'my_table_1_my_table_2', 'my_table_2'])47 self.assertEqual(sql_parser.with_statements['my_table_1'], 'select *\nfrom table_1')48 self.assertEqual(sql_parser.with_statements['my_table_2'], 'select *\nfrom table_2')49 self.assertEqual(sql_parser.with_statements['my_table_1_my_table_2'], '\nselect *\nfrom my_table_1 t1\njoin my_table_2 t2\non t1.id = t2.id\n;')50 def test_dependency(self):51 sql_parser = WithStatementParser('./test_data/example_3.sql')52 sql_parser.with_statements = {53 'my_table_1': 'select *\nfrom table_1',54 'my_table_2': 'select *\nfrom my_table_1 as m',55 'my_table_1_my_table_2': '\nselect *\nfrom my_table_1 t1\njoin my_table_2 t2\non t1.id = t2.id\n;'56 }57 sql_parser.dependency_alias()58 self.assertEqual(sql_parser.dependencies['my_table_1'], [])59 self.assertEqual(sql_parser.dependencies['my_table_2'], ['my_table_1'])60 self.assertEqual(sql_parser.dependencies['my_table_1_my_table_2'], ['my_table_1', 'my_table_2'])61 # no cycle in dependence62 sql_parser.dependencies = {63 'my_table_1': [],64 'my_table_2': ['my_table_1']65 }66 self.assertFalse(sql_parser.dependency_graph_contains_cycles())67 # 2 node cycle in dependence68 sql_parser.dependencies = {69 'my_table_1': ['my_table_2'],70 'my_table_2': ['my_table_1']71 }72 self.assertTrue(sql_parser.dependency_graph_contains_cycles())73 # 3 node cycle in dependence74 sql_parser.dependencies = {75 'my_table_1': ['my_table_2'],76 'my_table_2': ['my_table_3'],77 'my_table_3': ['my_table_1']78 }79 self.assertTrue(sql_parser.dependency_graph_contains_cycles())80 def test_alias(self):81 sql_parser = WithStatementParser('./test_data/example_3.sql')82 sql_parser.with_statements = {83 'my_table_1': 'select *\nfrom table_1',84 'my_table_2': 'select *\nfrom my_table_1 as m',85 'my_table_1_my_table_2': '\nselect *\nfrom my_table_1 t1\njoin my_table_2 t2\non t1.id = t2.id\n;'86 }87 sql_parser.dependency_alias()88 self.assertEqual(sorted(sql_parser.all_aliases), ['m', 't1', 't2'])89 self.assertEqual(sql_parser.aliases['my_table_1'], {})90 self.assertEqual(sql_parser.aliases['my_table_2'], {'my_table_1': 'm'})91 self.assertEqual(sql_parser.aliases['my_table_1_my_table_2']['my_table_1'], 't1')92 self.assertEqual(sql_parser.aliases['my_table_1_my_table_2']['my_table_2'], 't2')93 # return empty string when alias exists94 sql_parser.aliases = {'my_table_2': {'my_table_1', 't1'}}95 self.assertEqual(sql_parser.get_alias('my_table_2', 'my_table_1'), '')96 # return next available alias when alias doesn't exist97 sql_parser.alias_index = 198 sql_parser.all_aliases = ['t1']99 self.assertEqual(sql_parser.get_alias('my_table_2', 'my_table_3'), ' t2')100 # return next available alias101 sql_parser.alias_index = 1102 sql_parser.all_aliases = ['t1', 't2']103 self.assertEqual(sql_parser.next_alias(), ' t3')104 def test_create_nested_with_statements(self):105 sql_parser = WithStatementParser('./test_data/example_3.sql')106 sql_parser.extract_with_statements()107 sql_parser.create_nested_with_statements()108 nested_query = '''109select *110from (111 select *112 from table_1113 ) t1114join (115 select *116 from table_2 t2117 join (118 select *119 from table_1120 ) t1121 on t1.id = t2.id122 ) t2123on t1.id = t2.id124;125'''.strip()126 self.assertEqual(sql_parser.get_nested_query(), nested_query)127if __name__ == '__main__':...
app.py
Source:app.py
1import os, json, flask2from flask_restful import reqparse3from flask import Flask, request, render_template4from flask_restful import Resource, Api5import pandas as pd6from fetch_data import fetch_data, fetch_data_custom7from dataviz.data_viz import generate_charts8from config import config9app = Flask(__name__)10api = Api(app)11sql_parser = reqparse.RequestParser()12sql_parser.add_argument('limit', type=int, help="Limit the returned rows of SQL Query")13sql_parser.add_argument('filter', type=str, help="filter the returned rows of SQL Query")14sql_parser.add_argument('per_page', type=int, help="Per Page limit")15# custom_sql_args = reqparse.RequestParser()16# custom_sql_args.add_argument("sql", type=str, help="Send SQL Query", required=True)17def get_paginated_list(tablename, sql_filter, sql_limit, url, start, per_page=10):18 start = int(start)19 per_page = int(per_page)20 results = fetch_data(tablename, sql_filter + sql_limit)21 count = len(results)22 if count < start or per_page < 0:23 flask.abort(404)24 obj = {}25 obj['start'] = start26 obj['per_page'] = per_page27 obj['count'] = count28 if start == 1:29 obj['previous'] = ''30 else:31 start_copy = max(1, start - per_page)32 per_page_copy = start - 133 obj['previous'] = url + '?start=%d&per_page=%d' % (start_copy, per_page_copy)34 if start + per_page > count:35 obj['next'] = ''36 else:37 start_copy = start + per_page38 obj['next'] = url + '?start=%d&per_page=%d' % (start_copy, per_page)39 obj['results'] = json.loads(results[(start - 1):(start - 1 + per_page)][:].to_json(orient="records"))40 return obj41@app.route('/')42@app.route("/home")43def home_page():44 return render_template('home.html')45@app.route("/list_api")46def list_api_page():47 return render_template('list_api.html',48 api_list_items=[fname.split('.')[0] for fname in os.listdir(config.project_sql) if49 os.path.isdir(fname) == False])50@app.route("/fetch_apps/<string:tablename>", methods=['GET'])51def fetchdata_apps(tablename):52 # tablename='accnt'53 sql_arg = sql_parser.parse_args()54 if sql_arg['filter'] is not None:55 sql_filter = ' where ' + sql_arg['filter'] + ' '56 else:57 sql_filter = ' where 1 = 1'58 if sql_arg['limit'] and sql_arg['limit'] > 0:59 sql_limit = ' limit ' + str(sql_arg['limit'])60 else:61 sql_limit = ''62 sql_arg['per_page'] = 1063 if sql_arg['per_page'] is None:64 sql_arg['per_page'] = 1065 #df = fetch_data(tablename, sql_filter, sql_limit)66 #return flask.jsonify(json.loads(df.to_json(orient='records')))67 return flask.jsonify(get_paginated_list(68 tablename, sql_filter, sql_limit,69 request.url_root + 'fetch_apps/' + tablename,70 start=request.args.get('start', 1),71 per_page=request.args.get('per_page', sql_arg['per_page'])))72class FetchApp(Resource):73 def __init__(self):74 pass75 def get(self):76 # directory = config.project_sql77 api_list = [fname.split('.')[0] for fname in os.listdir(config.project_sql) if os.path.isdir(fname) == False]78 return flask.jsonify(api_list)79class GetViz(Resource):80 def __init__(self):81 pass82 def get(self):83 # directory = config.project_sql84 sql_file_list = [fname for fname in os.listdir(config.project_sql) if85 os.path.isdir(fname) == False and fname.endswith('.sql')]86 viz_file = generate_charts(sql_file_list)87 # print(viz_file)88 if '.' in viz_file:89 if viz_file.split('.')[1] == 'pdf' or viz_file.split('.')[1] == 'zip':90 return flask.send_file(viz_file)91 else:92 return viz_file93 else:94 return viz_file95class CustomSql(Resource):96 def __init__(self):97 pass98 # def get(self, custsql):99 # return custsql100 def post(self):101 obj={}102 data = request.get_json()103 sql_arg = sql_parser.parse_args()104 sql_filter = ' where 1 = 1'105 if sql_arg['limit'] and sql_arg['limit'] > 0:106 sql_limit = ' limit ' + str(sql_arg['limit'])107 else:108 sql_limit = ''109 sql_filter = ' where 1 = 1'110 df = fetch_data_custom(data, sql_filter + sql_limit)111 obj['results'] = json.loads(df.to_json(orient='records'))112 return flask.jsonify(obj)113api.add_resource(FetchApp, '/fetch_apps')114api.add_resource(CustomSql, '/customsql')115api.add_resource(GetViz, '/getviz')116if __name__ == '__main__':117 port = 5000...
check_sql.py
Source:check_sql.py
1# RE library to match correct syntactic structures of SQL statements2import re3class SQL_Parser:4 def __init__(self, s):5 self.stmt = s6 def parse_select(self,s):7 regex = 'select\s+(distinct|all)?\s*(\*|((\w+)\s*,\s*)*\s*(\w+))\s+from\s+(\w+)\s*(where\s+(((\w+)\s+between\s+(\w+)\s+and\s+(\w+))|((\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+)(\s*(and|or)\s+(\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+))*)))?\s*(having\s+([a-zA-Z_][a-zA-Z0-9_]*)\s*(<|>|<=|>=|=|<>)\s*([a-zA-Z0-9][a-zA-Z0-9]*) )?\s*(order\s+by\s+((\w+)\s*(asc|desc)?\s*,\s*)*\s*(\w+)\s+(asc|desc)?)?\s*(limit\s+(\d+))?\s*;?'8 return bool(re.match(regex, s))9 def parse_create(self,s):10 regex = "create table ([a-zA-Z_][a-zA-Z0-9_]*)\s*\(\s*(([a-zA-Z_][a-zA-Z0-9_]*)\s+(int|varchar|text)\s*(,)\s*)*(([a-zA-Z_][a-zA-Z0-9_]*)\s+(int|varchar|text))\)\s*;"11 return bool(re.match(regex, s))12 def parse_drop(self,s):13 regex = "drop table (([a-zA-Z_][a-zA-Z0-9_]*),)*([a-zA-Z_][a-zA-Z0-9_]*)(;)?"14 return bool(re.match(regex, s))15 def parse_insert(self,s):16 regex = 'insert\s+into\s+(\w+)\s*(\(\s*((([a-zA-Z0-9][a-zA-Z0-9_]*)\s*,\s*)*\s*([a-zA-Z0-9][a-zA-Z0-9_]*))\s*\))?\s+values\s*(\(\s*((((".*")|([a-zA-Z0-9][a-zA-Z0-9]*))\s*,\s*)*\s*([a-zA-Z0-9][a-zA-Z0-9]*))\s*\));?'17 return bool(re.match(regex, s))18 def parse_update(self,s):19 regex = 'update\s+([a-zA-Z_][a-zA-Z0-9_]*)\s+set\s+(([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(".*"|[0-9][0-9]*)\s*,\s*)*\s*(([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(".*"|[0-9][0-9]*))\s*(where\s+(([a-zA-Z_][a-zA-Z0-9_]*)\s*((between\s+([0-9][0-9]*)\s+and\s+([0-9][0-9]*))|(is\s+(not)?\s+ null)|(<|>|<=|>=|=|<>)\s*(".*"|[0-9][0-9]*))))\s*;?'20 return bool(re.match(regex, s))21 def parse_delete(self,s):22 regex = 'delete\s+from\s+(\w+)\s*(where\s+(((\w+)\s+between\s+(\w+)\s+and\s+(\w+))|((\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+)(\s*(and|or)\s+(\w+)\s*(<|>|<=|>=|=|<>)\s*(\w+))*)))?\s*(order\s+by\s+((\w+)\s*(asc|desc)?\s*,\s*)*\s*((\w+)\s*(asc|desc))?)?\s*(limit\s+(\d+));?'23 return bool(re.match(regex, s))24 def check_semicolon(self,s):25 if (s[len(s) - 1] != ';'):26 return False27 return True28 def tokenize(self, s):29 return s.split(" ")30 def check_keywords(self,tokens):31 keywords = ["select", "create", "drop", "insert", "update", "delete"]32 if (tokens[0] not in keywords):33 return False34 return True35 def clean(self,s):36 stmt = s.strip("\n")37 # coverting the sql statement to lower case for ease of checking38 stmt = stmt.lower()39 sql_keywords = ["select", "create", "drop", "insert", "update", "delete"]40 # converting all multiple white spaces into single space in the statement41 stmt = ' '.join(stmt.split())42 return stmt43 def parse_query(self):44 s = self.clean(self.stmt)45 # tokenizing the sql query46 tokens = self.tokenize(s)47 if (self.check_semicolon(s) == False or self.check_keywords(tokens) == False):48 return False49 query_type = tokens[0]50 if (query_type == "select"):51 return(self.parse_select(s))52 if (query_type == "update"):53 return(self.parse_update(s))54 if (query_type == "insert"):55 return(self.parse_insert(s))56 if (query_type == "delete"):57 return(self.parse_delete(s))58 if (query_type == "create"):59 return(self.parse_create(s))60 if (query_type == "drop"):61 return(self.parse_drop(s))62def main():63 print("\n************* Welcome to SQL Synctactic Parser made by Yukti Khurana ****************\n\n")64 # reading the sql statement65 stmt = "SELECT colname FROM table1, table2 WHERE condition_list ORDER BY colname ;"66 sql_parser = SQL_Parser(stmt)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!