Best Python code snippet using dbt-osmosis_python
osmosis.py
Source:osmosis.py
...754 return {755 "name": node.alias or node.name,756 "columns": [{"name": column_name} for column_name in columns],757 }758 def bootstrap_existing_model(759 self, model_documentation: Dict[str, Any], node: ManifestNode760 ) -> Dict[str, Any]:761 """Injects columns from database into existing model if not found"""762 model_columns: List[str] = [763 c["name"].lower() for c in model_documentation.get("columns", [])764 ]765 database_columns = self.get_columns(node)766 for column in database_columns:767 if column.lower() not in model_columns:768 logger().info(":syringe: Injecting column %s into dbt schema", column)769 model_documentation.setdefault("columns", []).append({"name": column})770 return model_documentation771 def get_columns(self, node: ManifestNode) -> List[str]:772 """Get all columns in a list for a model"""773 parts = self.get_database_parts(node)774 table = self.adapter.get_relation(*parts)775 columns = []776 if not table:777 logger().info(778 ":cross_mark: Relation %s.%s.%s does not exist in target database, cannot resolve columns",779 *parts,780 )781 return columns782 try:783 columns = [c.name for c in self.adapter.get_columns_in_relation(table)]784 except CompilationException as error:785 logger().info(786 ":cross_mark: Could not resolve relation %s.%s.%s against database active tables during introspective query: %s",787 *parts,788 str(error),789 )790 return columns791 @staticmethod792 def assert_schema_has_no_sources(schema: Mapping) -> Mapping:793 """Inline assertion ensuring that a schema does not have a source key"""794 if schema.get("sources"):795 raise SanitizationRequired(796 "Found `sources:` block in a models schema file. We require you separate sources in order to organize your project."797 )798 return schema799 def build_schema_folder_mapping(800 self,801 target_node_type: Optional[Union[NodeType.Model, NodeType.Source]] = None,802 ) -> Dict[str, SchemaFileLocation]:803 """Builds a mapping of models or sources to their existing and target schema file paths"""804 if target_node_type == NodeType.Source:805 # Source folder mapping is reserved for source importing806 target_nodes = self.dbt.sources807 elif target_node_type == NodeType.Model:808 target_nodes = self.dbt.nodes809 else:810 target_nodes = {**self.dbt.nodes, **self.dbt.sources}811 # Container for output812 schema_map = {}813 logger().info("...building project structure mapping in memory")814 # Iterate over models and resolve current path vs declarative target path815 for unique_id, dbt_node in self.filtered_models(target_nodes):816 schema_path = self.get_schema_path(dbt_node)817 osmosis_schema_path = self.get_target_schema_path(dbt_node)818 schema_map[unique_id] = SchemaFileLocation(819 target=osmosis_schema_path, current=schema_path820 )821 return schema_map822 def draft_project_structure_update_plan(self) -> Dict[Path, SchemaFileMigration]:823 """Build project structure update plan based on `dbt-osmosis:` configs set across dbt_project.yml and model files.824 The update plan includes injection of undocumented models. Unless this plan is constructed and executed by the `commit_project_restructure` function,825 dbt-osmosis will only operate on models it is aware of through the existing documentation.826 Returns:827 MutableMapping: Update plan where dict keys consist of targets and contents consist of outputs which match the contents of the `models` to be output in the828 target file and supersede lists of what files are superseded by a migration829 """830 # Container for output831 blueprint: Dict[Path, SchemaFileMigration] = {}832 logger().info(833 ":chart_increasing: Searching project stucture for required updates and building action plan"834 )835 with self.adapter.connection_named("dbt-osmosis"):836 for unique_id, schema_file in self.build_schema_folder_mapping(837 target_node_type=NodeType.Model838 ).items():839 if not schema_file.is_valid:840 blueprint.setdefault(841 schema_file.target,842 SchemaFileMigration(output={"version": 2, "models": []}, supersede={}),843 )844 node = self.dbt.nodes[unique_id]845 if schema_file.current is None:846 # Bootstrapping Undocumented Model847 blueprint[schema_file.target].output["models"].append(848 self.get_base_model(node)849 )850 else:851 # Model Is Documented but Must be Migrated852 if not schema_file.current.exists():853 continue854 # TODO: We avoid sources for complexity reasons but if we are opinionated, we don't have to855 schema = self.assert_schema_has_no_sources(856 self.yaml_handler.load(schema_file.current)857 )858 models_in_file: Iterable[Dict[str, Any]] = schema.get("models", [])859 for documented_model in models_in_file:860 if documented_model["name"] == node.name:861 # Bootstrapping Documented Model862 blueprint[schema_file.target].output["models"].append(863 self.bootstrap_existing_model(documented_model, node)864 )865 # Target to supersede current866 blueprint[schema_file.target].supersede.setdefault(867 schema_file.current, []868 ).append(documented_model["name"])869 break870 else:871 ... # Model not found at patch path -- We should pass on this for now872 else:873 ... # Valid schema file found for model -- We will update the columns in the `Document` task874 return blueprint875 def commit_project_restructure_to_disk(876 self, blueprint: Optional[Dict[Path, SchemaFileMigration]] = None877 ) -> bool:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!