Best Python code snippet using dbt-osmosis_python
ddl.py
Source:ddl.py
1#!/usr/bin/env python2# -*- coding: utf-8; tab-width: 4; indent-tabs-mode: t -*-3#4# NetProfile: Custom DDL constructs for SQLAlchemy5# © Copyright 2013-2014 Alex 'Unik' Unigovsky6#7# This file is part of NetProfile.8# NetProfile is free software: you can redistribute it and/or9# modify it under the terms of the GNU Affero General Public10# License as published by the Free Software Foundation, either11# version 3 of the License, or (at your option) any later12# version.13#14# NetProfile is distributed in the hope that it will be useful,15# but WITHOUT ANY WARRANTY; without even the implied warranty of16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the17# GNU Affero General Public License for more details.18#19# You should have received a copy of the GNU Affero General20# Public License along with NetProfile. If not, see21# <http://www.gnu.org/licenses/>.22from __future__ import (23 unicode_literals,24 print_function,25 absolute_import,26 division27)28import sys29import datetime as dt30from sqlalchemy.schema import (31 Column,32 DefaultClause,33 DDLElement,34 SchemaItem,35 Table36)37from sqlalchemy import (38 event,39 text40)41from sqlalchemy.sql import sqltypes42from sqlalchemy.sql.expression import ClauseElement43from sqlalchemy.sql.functions import FunctionElement44from sqlalchemy.ext.compiler import compiles45from sqlalchemy.ext.declarative import DeclarativeMeta46from sqlalchemy.orm import Query47from pyramid.renderers import render48from netprofile.ext.data import _table_to_class49from .connection import Base50class CurrentTimestampDefaultItem(ClauseElement):51 def __init__(self, on_update=False):52 self.on_update = on_update53@compiles(CurrentTimestampDefaultItem, 'mysql')54def visit_timestamp_default_mysql(element, compiler, **kw):55 ddl = 'CURRENT_TIMESTAMP'56 if element.on_update:57 ddl += ' ON UPDATE CURRENT_TIMESTAMP'58 return ddl59@compiles(CurrentTimestampDefaultItem)60def visit_timestamp_default(element, compiler, **kw):61 return 'CURRENT_TIMESTAMP'62class CurrentTimestampDefault(DefaultClause):63 def __init__(self, on_update=False):64 self.on_update = on_update65 super(CurrentTimestampDefault, self).__init__(66 CurrentTimestampDefaultItem(on_update),67 for_update=on_update68 )69 def _set_parent(self, column):70 self.column = column71 self.column.server_default = self72 if self.on_update:73 self.column.server_onupdate = self74class SQLFunctionArgument(DDLElement):75 def __init__(self, name, arg_type, arg_dir=None):76 self.name = name77 self.type = arg_type78 self.dir = arg_dir79class InArgument(SQLFunctionArgument):80 def __init__(self, name, arg_type):81 super(InArgument, self).__init__(name, arg_type, 'IN')82class OutArgument(SQLFunctionArgument):83 def __init__(self, name, arg_type):84 super(OutArgument, self).__init__(name, arg_type, 'OUT')85class InOutArgument(SQLFunctionArgument):86 def __init__(self, name, arg_type):87 super(InOutArgument, self).__init__(name, arg_type, 'INOUT')88@compiles(SQLFunctionArgument, 'mysql')89def visit_sql_function_arg(element, compiler, **kw):90 return '%s %s %s' % (91 element.dir if element.dir else '',92 compiler.sql_compiler.preparer.quote(element.name),93 compiler.dialect.type_compiler.process(element.type)94 )95class AlterTableAlterColumn(DDLElement):96 def __init__(self, table, column):97 self.table = table98 self.column = column99@compiles(AlterTableAlterColumn, 'mysql')100def visit_alter_table_alter_column_mysql(element, compiler, **kw):101 table = element.table102 col = element.column103 spec = compiler.get_column_specification(col, first_pk=col.primary_key)104 const = " ".join(compiler.process(constraint) \105 for constraint in col.constraints)106 if const:107 spec += " " + const108 if col.comment:109 spec += " COMMENT " + compiler.sql_compiler.render_literal_value(col.comment, sqltypes.STRINGTYPE)110 return 'ALTER TABLE %s CHANGE COLUMN %s %s' % (111 compiler.sql_compiler.preparer.format_table(table),112 compiler.sql_compiler.preparer.format_column(col),113 spec114 )115class SetTableComment(DDLElement):116 def __init__(self, table, comment):117 self.table = table118 self.text = comment119class SetColumnComment(DDLElement):120 def __init__(self, column, comment):121 self.column = column122 self.text = comment123class Comment(SchemaItem):124 """125 Represents a table comment DDL.126 """127 def __init__(self, ctext):128 self.text = ctext129 def _set_parent(self, parent):130 self.parent = parent131 parent.comment = self.text132 if isinstance(parent, Table):133 SetTableComment(parent, self.text).execute_at('after_create', parent)134 elif isinstance(parent, Column):135 text = self.text136 if not parent.doc:137 parent.doc = text138 def _set_col_comment(column, meta):139 SetColumnComment(column, text).execute_at('after_create', column.table)140 event.listen(141 parent,142 'after_parent_attach',143 _set_col_comment144 )145@compiles(SetColumnComment, 'mysql')146def visit_set_column_comment_mysql(element, compiler, **kw):147 spec = compiler.get_column_specification(element.column, first_pk=element.column.primary_key)148 const = " ".join(compiler.process(constraint) \149 for constraint in element.column.constraints)150 if const:151 spec += " " + const152 return 'ALTER TABLE %s MODIFY COLUMN %s COMMENT %s' % (153 compiler.sql_compiler.preparer.format_table(element.column.table),154 spec,155 compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)156 )157@compiles(SetColumnComment, 'postgresql')158@compiles(SetColumnComment, 'oracle')159def visit_set_column_comment_pgsql(element, compiler, **kw):160 return 'COMMENT ON COLUMN %s IS %s' % (161 compiler.sql_compiler.process(element.column),162 compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)163 )164@compiles(SetColumnComment)165def visit_set_column_comment(element, compiler, **kw):166 pass167@compiles(SetTableComment, 'mysql')168def visit_set_table_comment_mysql(element, compiler, **kw):169 return 'ALTER TABLE %s COMMENT=%s' % (170 compiler.sql_compiler.preparer.format_table(element.table),171 compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)172 )173@compiles(SetTableComment, 'postgresql')174@compiles(SetTableComment, 'oracle')175def visit_set_table_comment_pgsql(element, compiler, **kw):176 return 'COMMENT ON TABLE %s IS %s' % (177 compiler.sql_compiler.preparer.format_table(element.table),178 compiler.sql_compiler.render_literal_value(element.text, sqltypes.STRINGTYPE)179 )180@compiles(SetTableComment)181def visit_set_table_comment(element, compiler, **kw):182 pass183def ddl_fmt(ctx, obj):184 compiler = ctx['compiler']185 if isinstance(obj, (Trigger, SQLFunction, SQLEvent)):186 return compiler.sql_compiler.preparer.quote(obj.name)187 if isinstance(obj, Table):188 return compiler.sql_compiler.preparer.format_table(obj)189 if isinstance(obj, DeclarativeMeta):190 return compiler.sql_compiler.preparer.format_table(obj.__table__)191 if isinstance(obj, DDLElement):192 return compiler.process(obj)193 if isinstance(obj, sqltypes.TypeEngine):194 return compiler.dialect.type_compiler.process(obj)195 if isinstance(obj, (FunctionElement, ClauseElement)):196 return compiler.sql_compiler.process(obj)197 if isinstance(obj, dt.datetime):198 dname = compiler.dialect.name199 date = obj200 if dname in ('mysql', 'postgresql', 'sqlite'):201 date = date.strftime('%Y-%m-%d %H:%M:%S')202 return compiler.sql_compiler.render_literal_value(date, sqltypes.STRINGTYPE)203 if isinstance(obj, str):204 return compiler.sql_compiler.render_literal_value(obj, sqltypes.STRINGTYPE)205 if isinstance(obj, int):206 return compiler.sql_compiler.render_literal_value(obj, sqltypes.INTEGERTYPE)207 raise ValueError('Unable to format value for DDL')208class CreateTrigger(DDLElement):209 def __init__(self, table, trigger):210 self.table = table211 self.trigger = trigger212class DropTrigger(DDLElement):213 def __init__(self, table, trigger):214 self.table = table215 self.trigger = trigger216@compiles(CreateTrigger)217def visit_create_trigger(element, compiler, **kw):218 table = element.table219 cls = _table_to_class(table.name)220 module = cls.__module__.split('.')[0]221 trigger = element.trigger222 tpldef = {223 'table' : table,224 'class' : cls,225 'module' : module,226 'compiler' : compiler,227 'dialect' : compiler.dialect,228 'trigger' : trigger,229 'raw' : text230 }231 tpldef.update(Base._decl_class_registry.items())232 tplname = '%s:templates/sql/%s/triggers/%s.mak' % (233 module,234 compiler.dialect.name,235 trigger.name236 )237 return render(tplname, tpldef, package=sys.modules[module])238@compiles(DropTrigger, 'postgresql')239def visit_drop_trigger_mysql(element, compiler, **kw):240 return 'DROP TRIGGER %s ON %s' % (241 compiler.sql_compiler.preparer.quote(element.trigger.name),242 compiler.sql_compiler.preparer.format_table(element.parent)243 )244@compiles(DropTrigger)245def visit_drop_trigger(element, compiler, **kw):246 return 'DROP TRIGGER %s' % (247 compiler.sql_compiler.preparer.quote(element.trigger.name),248 )249class Trigger(SchemaItem):250 """251 Schema element that attaches a trigger template to an object.252 """253 def __init__(self, when='before', event='insert', name=None):254 self.when = when255 self.event = event256 self.name = name257 def _set_parent(self, parent):258 if isinstance(parent, Table):259 self.parent = parent260 CreateTrigger(parent, self).execute_at('after_create', parent)261 DropTrigger(parent, self).execute_at('before_drop', parent)262class CreateFunction(DDLElement):263 """264 SQL function template DDL object.265 """266 def __init__(self, func, module):267 self.func = func268 self.module = module269@compiles(CreateFunction)270def visit_create_function(element, compiler, **kw):271 func = element.func272 name = func.name273 module = 'netprofile_' + element.module274 tpldef = {275 'function' : func,276 'name' : name,277 'module' : module,278 'compiler' : compiler,279 'dialect' : compiler.dialect,280 'raw' : text281 }282 tpldef.update(Base._decl_class_registry.items())283 tplname = '%s:templates/sql/%s/functions/%s.mak' % (284 module,285 compiler.dialect.name,286 name287 )288 return render(tplname, tpldef, package=sys.modules[module])289class DropFunction(DDLElement):290 """291 SQL DROP FUNCTION DDL object.292 """293 def __init__(self, func):294 self.func = func295@compiles(DropFunction, 'postgresql')296def visit_drop_function_pgsql(element, compiler, **kw):297 func = element.func298 name = func.name299 return 'DROP FUNCTION %s' % (300 compiler.sql_compiler.preparer.quote(name),301 )302@compiles(DropFunction)303def visit_drop_function(element, compiler, **kw):304 func = element.func305 name = func.name306 is_proc = func.is_procedure307 return 'DROP %s %s' % (308 'PROCEDURE' if is_proc else 'FUNCTION',309 compiler.sql_compiler.preparer.quote(name)310 )311class SQLFunction(object):312 """313 Schema element that defines an SQL function or procedure.314 """315 def __init__(self, name, args=(), returns=None, comment=None, reads_sql=True, writes_sql=True, is_procedure=False, label=None):316 self.name = name317 self.args = args318 self.returns = returns319 self.comment = comment320 self.reads_sql = reads_sql321 self.writes_sql = writes_sql322 self.is_procedure = is_procedure323 self.label = label324 def create(self, modname):325 return CreateFunction(self, modname)326 def drop(self):327 return DropFunction(self)328class CreateEvent(DDLElement):329 """330 SQL event template DDL object.331 """332 def __init__(self, evt, module):333 self.event = evt334 self.module = module335@compiles(CreateEvent, 'mysql')336def visit_create_event_mysql(element, compiler, **kw):337 evt = element.event338 name = evt.name339 module = 'netprofile_' + element.module340 tpldef = {341 'event' : evt,342 'name' : name,343 'module' : module,344 'compiler' : compiler,345 'dialect' : compiler.dialect,346 'raw' : text347 }348 tpldef.update(Base._decl_class_registry.items())349 tplname = '%s:templates/sql/%s/events/%s.mak' % (350 module,351 compiler.dialect.name,352 name353 )354 return render(tplname, tpldef, package=sys.modules[module])355class DropEvent(DDLElement):356 """357 SQL DROP EVENT DDL object.358 """359 def __init__(self, evt):360 self.event = evt361@compiles(DropEvent, 'mysql')362def visit_drop_event_mysql(element, compiler, **kw):363 evt = element.event364 return 'DROP EVENT %s' % (365 compiler.sql_compiler.preparer.quote(evt.name),366 )367class SQLEvent(object):368 """369 Schema element that defines some periodically executed SQL code.370 """371 def __init__(self, name, sched_unit='month', sched_interval=1, starts=None, preserve=True, enabled=True, comment=None):372 self.name = name373 self.preserve = preserve374 self.enabled = enabled375 self.comment = comment376 self.sched_unit = sched_unit377 self.sched_interval = sched_interval378 self.starts = starts379 def create(self, modname):380 return CreateEvent(self, modname)381 def drop(self):382 return DropEvent(self)383class CreateView(DDLElement):384 """385 SQL create view DDL object.386 """387 def __init__(self, name, select, check_option=None):388 self.name = name389 self.select = select390 self.check = check_option391class DropView(DDLElement):392 """393 SQL drop view DDL object.394 """395 def __init__(self, name):396 self.name = name397@compiles(CreateView)398def visit_create_view(element, compiler, **kw):399 sel = element.select400 co = ''401 if callable(sel):402 sel = sel()403 if isinstance(sel, Query):404 ctx = sel._compile_context()405 ctx.statement.use_labels = True406 conn = sel.session.connection(mapper=sel._mapper_zero_or_none(), clause=ctx.statement)407 sel = ctx.statement.compile(conn, compile_kwargs={ 'literal_binds': True })408 else:409 sel = compiler.sql_compiler.process(sel, literal_binds=True)410 if element.check:411 if isinstance(element.check, str):412 co = ' WITH %s CHECK OPTION' % (element.check.upper(),)413 else:414 co = ' WITH CHECK OPTION'415 return 'CREATE VIEW %s AS %s%s' % (416 compiler.sql_compiler.preparer.quote(element.name),417 sel, co418 )419@compiles(DropView)420def visit_drop_view(element, compiler, **kw):421 return 'DROP VIEW %s' % (422 compiler.sql_compiler.preparer.quote(element.name),423 )424class View(SchemaItem):425 """426 Schema element that attaches a view with a predefined query to an object.427 """428 def __init__(self, name, select, check_option=None):429 self.name = name430 self.select = select431 self.check = check_option432 def _set_parent(self, parent):433 if isinstance(parent, Table):434 self.parent = parent435 CreateView(self.name, self.select, check_option=self.check).execute_at('after_create', parent)436 DropView(self.name).execute_at('before_drop', parent)437 def create(self):438 return CreateView(self.name, self.select, check_option=self.check)439 def drop(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!