How to use safe_parse_project method in dbt-osmosis

Best Python code snippet using dbt-osmosis_python

osmosis.py

Source:osmosis.py Github

copy

Full Screen

...318 @property319 def manifest(self) -> ManifestProxy:320 """dbt manifest dict"""321 return ManifestProxy(self.dbt.flat_graph)322 def safe_parse_project(self, reinit: bool = False) -> None:323 """This is used to reseed the DbtProject safely post-init. This is324 intended for use by the osmosis server"""325 if reinit:326 self.clear_caches()327 _config_pointer = copy(self.config)328 try:329 self.parse_project(init=reinit)330 except Exception as parse_error:331 self.config = _config_pointer332 raise parse_error333 self.write_manifest_artifact()334 def write_manifest_artifact(self) -> None:335 """Write a manifest.json to disk"""336 artifact_path = os.path.join(337 self.config.project_root, self.config.target_path, MANIFEST_ARTIFACT338 )339 self.dbt.write(artifact_path)340 def clear_caches(self) -> None:341 """Clear least recently used caches and reinstantiable container objects"""342 self.get_ref_node.cache_clear()343 self.get_source_node.cache_clear()344 self.get_macro_function.cache_clear()345 self.get_columns.cache_clear()346 self.compile_sql.cache_clear()347 @lru_cache(maxsize=10)348 def get_ref_node(self, target_model_name: str) -> MaybeNonSource:349 """Get a `ManifestNode` from a dbt project model name"""350 return self.dbt.resolve_ref(351 target_model_name=target_model_name,352 target_model_package=None,353 current_project=self.config.project_name,354 node_package=self.config.project_name,355 )356 @lru_cache(maxsize=10)357 def get_source_node(self, target_source_name: str, target_table_name: str) -> MaybeParsedSource:358 """Get a `ManifestNode` from a dbt project source name and table name"""359 return self.dbt.resolve_source(360 target_source_name=target_source_name,361 target_table_name=target_table_name,362 current_project=self.config.project_name,363 node_package=self.config.project_name,364 )365 def get_server_node(self, sql: str, node_name="name"):366 """Get a node for SQL execution against adapter"""367 self._clear_node(node_name)368 sql_node = self.sql_parser.parse_remote(sql, node_name)369 process_node(self.config, self.dbt, sql_node)370 return sql_node371 @lru_cache(maxsize=10)372 def get_node_by_path(self, path: str):373 """Find an existing node given relative file path."""374 for node in self.dbt.nodes.values():375 if node.original_file_path == path:376 return node377 return None378 @lru_cache(maxsize=100)379 def get_macro_function(self, macro_name: str) -> Callable[[Dict[str, Any]], Any]:380 """Get macro as a function which takes a dict via argument named `kwargs`,381 ie: `kwargs={"relation": ...}`382 make_schema_fn = get_macro_function('make_schema')\n383 make_schema_fn({'name': '__test_schema_1'})\n384 make_schema_fn({'name': '__test_schema_2'})"""385 return partial(self.adapter.execute_macro, macro_name=macro_name, manifest=self.dbt)386 def adapter_execute(387 self, sql: str, auto_begin: bool = False, fetch: bool = False388 ) -> Tuple[AdapterResponse, agate.Table]:389 """Wraps adapter.execute. Execute SQL against database"""390 return self.adapter.execute(sql, auto_begin, fetch)391 def execute_macro(392 self,393 macro: str,394 kwargs: Optional[Dict[str, Any]] = None,395 ) -> Any:396 """Wraps adapter execute_macro. Execute a macro like a function."""397 return self.get_macro_function(macro)(kwargs=kwargs)398 def execute_sql(self, raw_sql: str) -> DbtAdapterExecutionResult:399 """Execute dbt SQL statement against database"""400 # if no jinja chars then these are synonymous401 compiled_sql = raw_sql402 if has_jinja(raw_sql):403 # jinja found, compile it404 compiled_sql = self.compile_sql(raw_sql).compiled_sql405 return DbtAdapterExecutionResult(406 *self.adapter_execute(compiled_sql, fetch=True),407 raw_sql,408 compiled_sql,409 )410 def execute_node(self, node: ManifestNode) -> DbtAdapterExecutionResult:411 """Execute dbt SQL statement against database from a ManifestNode"""412 raw_sql: str = getattr(node, RAW_CODE)413 compiled_sql: Optional[str] = getattr(node, COMPILED_CODE, None)414 if compiled_sql:415 # node is compiled, execute the SQL416 return self.execute_sql(compiled_sql)417 # node not compiled418 if has_jinja(raw_sql):419 # node has jinja in its SQL, compile it420 compiled_sql = self.compile_node(node).compiled_sql421 # execute the SQL422 return self.execute_sql(compiled_sql or raw_sql)423 @lru_cache(maxsize=SQL_CACHE_SIZE)424 def compile_sql(self, raw_sql: str, retry: int = 3) -> DbtAdapterCompilationResult:425 """Creates a node with `get_server_node` method. Compile generated node.426 Has a retry built in because even uuidv4 cannot gaurantee uniqueness at the speed427 in which we can call this function concurrently. A retry significantly increases the stability"""428 temp_node_id = str(uuid.uuid4())429 try:430 node = self.compile_node(self.get_server_node(raw_sql, temp_node_id))431 except Exception as exc:432 if retry > 0:433 return self.compile_sql(raw_sql, retry - 1)434 raise exc435 else:436 return node437 finally:438 self._clear_node(temp_node_id)439 def compile_node(self, node: ManifestNode) -> DbtAdapterCompilationResult:440 """Compiles existing node."""441 self.sql_compiler.node = node442 # this is essentially a convenient wrapper to adapter.get_compiler443 compiled_node = self.sql_compiler.compile(self.dbt)444 return DbtAdapterCompilationResult(445 getattr(compiled_node, RAW_CODE),446 getattr(compiled_node, COMPILED_CODE),447 compiled_node,448 )449 def _clear_node(self, name="name"):450 """Removes the statically named node created by `execute_sql` and `compile_sql` in `dbt.lib`"""451 self.dbt.nodes.pop(f"{NodeType.SqlOperation}.{self.project_name}.{name}", None)452 def get_relation(self, database: str, schema: str, name: str) -> Optional[BaseRelation]:453 """Wrapper for `adapter.get_relation`"""454 return self.adapter.get_relation(database, schema, name)455 def create_relation(self, database: str, schema: str, name: str) -> BaseRelation:456 """Wrapper for `adapter.Relation.create`"""457 return self.adapter.Relation.create(database, schema, name)458 def create_relation_from_node(self, node: ManifestNode) -> BaseRelation:459 """Wrapper for `adapter.Relation.create_from`"""460 return self.adapter.Relation.create_from(self.config, node)461 def get_columns_in_relation(self, node: ManifestNode) -> List[str]:462 """Wrapper for `adapter.get_columns_in_relation`"""463 return self.adapter.get_columns_in_relation(self.create_relation_from_node(node))464 @lru_cache(maxsize=5)465 def get_columns(self, node: ManifestNode) -> List[ColumnInfo]:466 """Get a list of columns from a compiled node"""467 columns = []468 try:469 columns.extend(470 [c.name for c in self.get_columns_in_relation(self.create_relation_from_node(node))]471 )472 except CompilationException:473 original_sql = str(getattr(node, RAW_CODE))474 # TODO: account for `TOP` syntax475 setattr(node, RAW_CODE, f"select * from ({original_sql}) limit 0")476 result = self.execute_node(node)477 setattr(node, RAW_CODE, original_sql)478 delattr(node, COMPILED_CODE)479 columns.extend(result.table.column_names)480 return columns481 def get_or_create_relation(482 self, database: str, schema: str, name: str483 ) -> Tuple[BaseRelation, bool]:484 """Get relation or create if not exists. Returns tuple of relation and485 boolean result of whether it existed ie: (relation, did_exist)"""486 ref = self.get_relation(database, schema, name)487 return (ref, True) if ref else (self.create_relation(database, schema, name), False)488 def create_schema(self, node: ManifestNode):489 """Create a schema in the database"""490 return self.execute_macro(491 "create_schema",492 kwargs={"relation": self.create_relation_from_node(node)},493 )494 def materialize(495 self, node: ManifestNode, temporary: bool = True496 ) -> Tuple[AdapterResponse, None]:497 """Materialize a table in the database"""498 return self.adapter_execute(499 # Returns CTAS string so send to adapter.execute500 self.execute_macro(501 "create_table_as",502 kwargs={503 "sql": getattr(node, COMPILED_CODE),504 "relation": self.create_relation_from_node(node),505 "temporary": temporary,506 },507 ),508 auto_begin=True,509 )510class DbtProjectContainer:511 """This class manages multiple DbtProjects which each correspond512 to a single dbt project on disk. This is mostly for osmosis server use"""513 def __init__(self):514 self._projects: Dict[str, DbtProject] = OrderedDict()515 self._default_project: Optional[str] = None516 def get_project(self, project_name: str) -> Optional[DbtProject]:517 """Primary interface to get a project and execute code"""518 return self._projects.get(project_name)519 @lru_cache(maxsize=10)520 def get_project_by_root_dir(self, root_dir: str) -> Optional[DbtProject]:521 """Get a project by its root directory."""522 root_dir = os.path.abspath(os.path.normpath(root_dir))523 for project in self._projects.values():524 if os.path.abspath(project.project_root) == root_dir:525 return project526 return None527 def get_default_project(self) -> Optional[DbtProject]:528 """Gets the default project which at any given time is the529 earliest project inserted into the container"""530 return self._projects.get(self._default_project)531 def add_project(532 self,533 target: Optional[str] = None,534 profiles_dir: Optional[str] = None,535 project_dir: Optional[str] = None,536 threads: Optional[int] = 1,537 name_override: Optional[str] = "",538 ) -> DbtProject:539 """Add a DbtProject with arguments"""540 project = DbtProject(target, profiles_dir, project_dir, threads)541 project_name = name_override or project.config.project_name542 if self._default_project is None:543 self._default_project = project_name544 self._projects[project_name] = project545 return project546 def add_parsed_project(self, project: DbtProject) -> DbtProject:547 """Add an already instantiated DbtProject"""548 self._projects.setdefault(project.config.project_name, project)549 return project550 def add_project_from_args(self, args: ConfigInterface) -> DbtProject:551 """Add a DbtProject from a ConfigInterface"""552 project = DbtProject.from_args(args)553 self._projects.setdefault(project.config.project_name, project)554 return project555 def drop_project(self, project_name: str) -> None:556 """Drop a DbtProject"""557 project = self.get_project(project_name)558 if project is None:559 return560 project.clear_caches()561 project.adapter.connections.cleanup_all()562 self._projects.pop(project_name)563 if self._default_project == project_name:564 if len(self) > 0:565 self._default_project = self._projects.keys()[0]566 else:567 self._default_project = None568 def drop_all_projects(self) -> None:569 """Drop all DbtProjectContainers"""570 self._default_project = None571 for project in self._projects:572 self.drop_project(project)573 def reparse_all_projects(self) -> None:574 """Reparse all projects"""575 for project in self:576 project.safe_parse_project()577 def registered_projects(self) -> List[str]:578 """Convenience to grab all registered project names"""579 return list(self._projects.keys())580 def __len__(self):581 """Allows len(DbtProjectContainer)"""582 return len(self._projects)583 def __getitem__(self, project: str):584 """Allows DbtProjectContainer['jaffle_shop']"""585 maybe_project = self.get_project(project)586 if maybe_project is None:587 raise KeyError(project)588 return maybe_project589 def __delitem__(self, project: str):590 """Allows del DbtProjectContainer['jaffle_shop']"""...

Full Screen

Full Screen

server_v2.py

Source:server_v2.py Github

copy

Full Screen

...277 given project at any given time synchronously or asynchronously"""278 target_did_change = old_target != new_target279 try:280 runner.args.target = new_target281 runner.safe_parse_project(reinit=reset or target_did_change)282 except Exception as reparse_err:283 runner.args.target = old_target284 rv = OsmosisErrorContainer(285 error=OsmosisError(286 code=OsmosisErrorCode.ProjectParseFailure,287 message=str(reparse_err),288 data=reparse_err.__dict__,289 )290 )291 else:292 runner._version += 1293 rv = OsmosisResetResult(294 result=(295 f"Profile target changed from {old_target} to {new_target}!"...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

...104 dry_run=dry_run,105 )106 # Conform project structure & bootstrap undocumented models injecting columns107 if runner.commit_project_restructure_to_disk():108 runner.safe_parse_project()109 runner.propagate_documentation_downstream(force_inheritance=force_inheritance)110@yaml.command(context_settings=CONTEXT)111@shared_opts112@click.option(113 "-f",114 "--fqn",115 type=click.STRING,116 help="Specify models based on FQN. Use dots as separators. Looks like folder.folder.model or folder.folder.source.table. Use list command to see the scope of an FQN filter.",117)118@click.option(119 "-d",120 "--dry-run",121 is_flag=True,122 help="If specified, no changes are committed to disk.",...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run dbt-osmosis automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful