Best Python code snippet using dbt-osmosis_python
diff.py
Source:diff.py
...5from dbt.adapters.base.relation import BaseRelation6from git import Repo7from dbt_osmosis.core.log_controller import logger8from dbt_osmosis.core.osmosis import DbtProject9def build_diff_queries(model: str, runner: DbtProject) -> Tuple[str, str]:10 """Leverage git to build two temporary tables for diffing the results of a query throughout a change"""11 # Resolve git node12 node = runner.get_ref_node(model)13 dbt_path = Path(node.root_path)14 repo = Repo(dbt_path, search_parent_directories=True)15 t = next(Path(repo.working_dir).rglob(node.original_file_path)).relative_to(repo.working_dir)16 sha = repo.head.object.hexsha17 target = repo.head.object.tree[str(t)]18 # Create original node19 git_node_name = "z_" + sha[-7:]20 original_node = runner.get_server_node(target.data_stream.read().decode("utf-8"), git_node_name)21 # Alias changed node22 changed_node = node23 # Compile models24 original_node = runner.compile_node(original_node)25 changed_node = runner.compile_node(changed_node)26 return original_node.compiled_sql, changed_node.compiled_sql27def build_diff_tables(model: str, runner: DbtProject) -> Tuple[BaseRelation, BaseRelation]:28 """Leverage git to build two temporary tables for diffing the results of a query throughout a change"""29 # Resolve git node30 node = runner.get_ref_node(model)31 dbt_path = Path(node.root_path)32 repo = Repo(dbt_path, search_parent_directories=True)33 t = next(Path(repo.working_dir).rglob(node.original_file_path)).relative_to(repo.working_dir)34 sha = repo.head.object.hexsha35 target = repo.head.object.tree[str(t)]36 # Create original node37 git_node_name = "z_" + sha[-7:]38 original_node = runner.get_server_node(target.data_stream.read().decode("utf-8"), git_node_name)39 # Alias changed node40 changed_node = node41 # Compile models42 original_node = runner.compile_node(original_node).node43 changed_node = runner.compile_node(changed_node).node44 # Lookup and resolve original ref based on git sha45 git_node_parts = original_node.database, "dbt_diff", git_node_name46 ref_A, did_exist = runner.get_or_create_relation(*git_node_parts)47 if not did_exist:48 logger().info("Creating new relation for %s", ref_A)49 with runner.adapter.connection_named("dbt-osmosis"):50 runner.execute_macro(51 "create_schema",52 kwargs={"relation": ref_A},53 )54 runner.execute_macro(55 "create_table_as",56 kwargs={57 "sql": original_node.compiled_sql,58 "relation": ref_A,59 "temporary": True,60 },61 run_compiled_sql=True,62 )63 # Resolve modified fake ref based on hash of it compiled SQL64 temp_node_name = "z_" + hashlib.md5(changed_node.compiled_sql.encode("utf-8")).hexdigest()[-7:]65 git_node_parts = original_node.database, "dbt_diff", temp_node_name66 ref_B, did_exist = runner.get_or_create_relation(*git_node_parts)67 if not did_exist:68 ref_B = runner.adapter.Relation.create(*git_node_parts)69 logger().info("Creating new relation for %s", ref_B)70 with runner.adapter.connection_named("dbt-osmosis"):71 runner.execute_macro(72 "create_schema",73 kwargs={"relation": ref_B},74 )75 runner.execute_macro(76 "create_table_as",77 kwargs={78 "sql": original_node.compiled_sql,79 "relation": ref_B,80 "temporary": True,81 },82 run_compiled_sql=True,83 )84 return ref_A, ref_B85def diff_tables(86 ref_A: BaseRelation,87 ref_B: BaseRelation,88 pk: str,89 runner: DbtProject,90 aggregate: bool = True,91) -> agate.Table:92 logger().info("Running diff")93 _, table = runner.adapter_execute(94 runner.execute_macro(95 "_dbt_osmosis_compare_relations_agg" if aggregate else "_dbt_osmosis_compare_relations",96 kwargs={97 "a_relation": ref_A,98 "b_relation": ref_B,99 "primary_key": pk,100 },101 ),102 auto_begin=True,103 fetch=True,104 )105 return table106def diff_queries(107 sql_A: str, sql_B: str, pk: str, runner: DbtProject, aggregate: bool = True108) -> agate.Table:109 logger().info("Running diff")110 _, table = runner.adapter_execute(111 runner.execute_macro(112 "_dbt_osmosis_compare_queries_agg" if aggregate else "_dbt_osmosis_compare_queries",113 kwargs={114 "a_query": sql_A,115 "b_query": sql_B,116 "primary_key": pk,117 },118 ),119 auto_begin=True,120 fetch=True,121 )122 return table123def diff_and_print_to_console(124 model: str,125 pk: str,126 runner: DbtProject,127 make_temp_tables: bool = False,128 agg: bool = True,129 output: str = "table",130) -> None:131 """132 Compare two tables and print the results to the console133 """134 if make_temp_tables:135 table = diff_tables(*build_diff_tables(model, runner), pk, runner, agg)136 else:137 table = diff_queries(*build_diff_queries(model, runner), pk, runner, agg)138 print("")139 output = output.lower()140 if output == "table":141 table.print_table()142 elif output in ("chart", "bar"):143 if not agg:144 logger().warn(145 "Cannot render output format chart with --no-agg option, defaulting to table"146 )147 table.print_table()148 else:149 _table = table.compute(150 [151 (...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!