Best Python code snippet using dbt-osmosis_python
osmosis.py
Source:osmosis.py
...795 raise SanitizationRequired(796 "Found `sources:` block in a models schema file. We require you separate sources in order to organize your project."797 )798 return schema799 def build_schema_folder_mapping(800 self,801 target_node_type: Optional[Union[NodeType.Model, NodeType.Source]] = None,802 ) -> Dict[str, SchemaFileLocation]:803 """Builds a mapping of models or sources to their existing and target schema file paths"""804 if target_node_type == NodeType.Source:805 # Source folder mapping is reserved for source importing806 target_nodes = self.dbt.sources807 elif target_node_type == NodeType.Model:808 target_nodes = self.dbt.nodes809 else:810 target_nodes = {**self.dbt.nodes, **self.dbt.sources}811 # Container for output812 schema_map = {}813 logger().info("...building project structure mapping in memory")814 # Iterate over models and resolve current path vs declarative target path815 for unique_id, dbt_node in self.filtered_models(target_nodes):816 schema_path = self.get_schema_path(dbt_node)817 osmosis_schema_path = self.get_target_schema_path(dbt_node)818 schema_map[unique_id] = SchemaFileLocation(819 target=osmosis_schema_path, current=schema_path820 )821 return schema_map822 def draft_project_structure_update_plan(self) -> Dict[Path, SchemaFileMigration]:823 """Build project structure update plan based on `dbt-osmosis:` configs set across dbt_project.yml and model files.824 The update plan includes injection of undocumented models. Unless this plan is constructed and executed by the `commit_project_restructure` function,825 dbt-osmosis will only operate on models it is aware of through the existing documentation.826 Returns:827 MutableMapping: Update plan where dict keys consist of targets and contents consist of outputs which match the contents of the `models` to be output in the828 target file and supersede lists of what files are superseded by a migration829 """830 # Container for output831 blueprint: Dict[Path, SchemaFileMigration] = {}832 logger().info(833 ":chart_increasing: Searching project stucture for required updates and building action plan"834 )835 with self.adapter.connection_named("dbt-osmosis"):836 for unique_id, schema_file in self.build_schema_folder_mapping(837 target_node_type=NodeType.Model838 ).items():839 if not schema_file.is_valid:840 blueprint.setdefault(841 schema_file.target,842 SchemaFileMigration(output={"version": 2, "models": []}, supersede={}),843 )844 node = self.dbt.nodes[unique_id]845 if schema_file.current is None:846 # Bootstrapping Undocumented Model847 blueprint[schema_file.target].output["models"].append(848 self.get_base_model(node)849 )850 else:851 # Model Is Documented but Must be Migrated852 if not schema_file.current.exists():853 continue854 # TODO: We avoid sources for complexity reasons but if we are opinionated, we don't have to855 schema = self.assert_schema_has_no_sources(856 self.yaml_handler.load(schema_file.current)857 )858 models_in_file: Iterable[Dict[str, Any]] = schema.get("models", [])859 for documented_model in models_in_file:860 if documented_model["name"] == node.name:861 # Bootstrapping Documented Model862 blueprint[schema_file.target].output["models"].append(863 self.bootstrap_existing_model(documented_model, node)864 )865 # Target to supersede current866 blueprint[schema_file.target].supersede.setdefault(867 schema_file.current, []868 ).append(documented_model["name"])869 break870 else:871 ... # Model not found at patch path -- We should pass on this for now872 else:873 ... # Valid schema file found for model -- We will update the columns in the `Document` task874 return blueprint875 def commit_project_restructure_to_disk(876 self, blueprint: Optional[Dict[Path, SchemaFileMigration]] = None877 ) -> bool:878 """Given a project restrucure plan of pathlib Paths to a mapping of output and supersedes which is in itself a mapping of Paths to model names,879 commit changes to filesystem to conform project to defined structure as code fully or partially superseding existing models as needed.880 Args:881 blueprint (Dict[Path, SchemaFileMigration]): Project restructure plan as typically created by `build_project_structure_update_plan`882 Returns:883 bool: True if the project was restructured, False if no action was required884 """885 # Build blueprint if not user supplied886 if not blueprint:887 blueprint = self.draft_project_structure_update_plan()888 # Verify we have actions in the plan889 if not blueprint:890 logger().info(":1st_place_medal: Project structure approved")891 return False892 # Print plan for user auditability893 self.pretty_print_restructure_plan(blueprint)894 logger().info(895 ":construction_worker: Executing action plan and conforming projecting schemas to defined structure"896 )897 for target, structure in blueprint.items():898 if not target.exists():899 # Build File900 logger().info(":construction: Building schema file %s", target.name)901 if not self.dry_run:902 target.parent.mkdir(exist_ok=True, parents=True)903 target.touch()904 self.yaml_handler.dump(structure.output, target)905 else:906 # Update File907 logger().info(":toolbox: Updating schema file %s", target.name)908 target_schema: Optional[Dict[str, Any]] = self.yaml_handler.load(target)909 if not target_schema:910 target_schema = {"version": 2}911 elif "version" not in target_schema:912 target_schema["version"] = 2913 target_schema.setdefault("models", []).extend(structure.output["models"])914 if not self.dry_run:915 self.yaml_handler.dump(target_schema, target)916 # Clean superseded schema files917 for dir, models in structure.supersede.items():918 preserved_models = []919 raw_schema: Dict[str, Any] = self.yaml_handler.load(dir)920 models_marked_for_superseding = set(models)921 models_in_schema = set(map(lambda mdl: mdl["name"], raw_schema.get("models", [])))922 non_superseded_models = models_in_schema - models_marked_for_superseding923 if len(non_superseded_models) == 0:924 logger().info(":rocket: Superseded schema file %s", dir.name)925 if not self.dry_run:926 dir.unlink(missing_ok=True)927 else:928 for model in raw_schema["models"]:929 if model["name"] in non_superseded_models:930 preserved_models.append(model)931 raw_schema["models"] = preserved_models932 if not self.dry_run:933 self.yaml_handler.dump(raw_schema, dir)934 logger().info(935 ":satellite: Model documentation migrated from %s to %s",936 dir.name,937 target.name,938 )939 return True940 @staticmethod941 def pretty_print_restructure_plan(blueprint: Dict[Path, SchemaFileMigration]) -> None:942 logger().info(943 list(944 map(945 lambda plan: (blueprint[plan].supersede or "CREATE", "->", plan),946 blueprint.keys(),947 )948 )949 )950 def build_node_ancestor_tree(951 self,952 node: ManifestNode,953 family_tree: Optional[Dict[str, List[str]]] = None,954 members_found: Optional[List[str]] = None,955 depth: int = 0,956 ) -> Dict[str, List[str]]:957 """Recursively build dictionary of parents in generational order"""958 if family_tree is None:959 family_tree = {}960 if members_found is None:961 members_found = []962 for parent in node.depends_on.nodes:963 member = self.dbt.nodes.get(parent, self.dbt.sources.get(parent))964 if member and parent not in members_found:965 family_tree.setdefault(f"generation_{depth}", []).append(parent)966 members_found.append(parent)967 # Recursion968 family_tree = self.build_node_ancestor_tree(969 member, family_tree, members_found, depth + 1970 )971 return family_tree972 def inherit_column_level_knowledge(973 self,974 family_tree: Dict[str, Any],975 ) -> Dict[str, Dict[str, Any]]:976 """Inherit knowledge from ancestors in reverse insertion order to ensure that the most recent ancestor is always the one to inherit from"""977 knowledge: Dict[str, Dict[str, Any]] = {}978 for generation in reversed(family_tree):979 for ancestor in family_tree[generation]:980 member: ManifestNode = self.dbt.nodes.get(ancestor, self.dbt.sources.get(ancestor))981 if not member:982 continue983 for name, info in member.columns.items():984 knowledge.setdefault(name, {"progenitor": ancestor})985 deserialized_info = info.to_dict()986 # Handle Info:987 # 1. tags are additive988 # 2. descriptions are overriden989 # 3. meta is merged990 # 4. tests are ignored until I am convinced those shouldn't be hand curated with love991 if deserialized_info["description"] in self.placeholders:992 deserialized_info.pop("description", None)993 deserialized_info["tags"] = list(994 set(deserialized_info.pop("tags", []) + knowledge[name].get("tags", []))995 )996 if not deserialized_info["tags"]:997 deserialized_info.pop("tags") # poppin' tags like Macklemore998 deserialized_info["meta"] = {999 **knowledge[name].get("meta", {}),1000 **deserialized_info["meta"],1001 }1002 if not deserialized_info["meta"]:1003 deserialized_info.pop("meta")1004 knowledge[name].update(deserialized_info)1005 return knowledge1006 def get_node_columns_with_inherited_knowledge(1007 self,1008 node: ManifestNode,1009 ) -> Dict[str, Dict[str, Any]]:1010 """Build a knowledgebase for the model based on iterating through ancestors"""1011 family_tree = self.build_node_ancestor_tree(node)1012 knowledge = self.inherit_column_level_knowledge(family_tree)1013 return knowledge1014 @staticmethod1015 def get_column_sets(1016 database_columns: Iterable[str],1017 yaml_columns: Iterable[str],1018 documented_columns: Iterable[str],1019 ) -> Tuple[List[str], List[str], List[str]]:1020 """Returns:1021 missing_columns: Columns in database not in dbt -- will be injected into schema file1022 undocumented_columns: Columns missing documentation -- descriptions will be inherited and injected into schema file where prior knowledge exists1023 extra_columns: Columns in schema file not in database -- will be removed from schema file1024 """1025 missing_columns = [1026 x for x in database_columns if x.lower() not in (y.lower() for y in yaml_columns)1027 ]1028 undocumented_columns = [1029 x for x in database_columns if x.lower() not in (y.lower() for y in documented_columns)1030 ]1031 extra_columns = [1032 x for x in yaml_columns if x.lower() not in (y.lower() for y in database_columns)1033 ]1034 return missing_columns, undocumented_columns, extra_columns1035 def propagate_documentation_downstream(self, force_inheritance: bool = False) -> None:1036 schema_map = self.build_schema_folder_mapping()1037 with self.adapter.connection_named("dbt-osmosis"):1038 for unique_id, node in track(list(self.filtered_models())):1039 logger().info("\n:point_right: Processing model: [bold]%s[/bold] \n", unique_id)1040 # Get schema file path, must exist to propagate documentation1041 schema_path: Optional[SchemaFileLocation] = schema_map.get(unique_id)1042 if schema_path is None or schema_path.current is None:1043 logger().info(1044 ":bow: No valid schema file found for model %s", unique_id1045 ) # We can't take action1046 continue1047 # Build Sets1048 database_columns: Set[str] = set(self.get_columns(node))1049 yaml_columns: Set[str] = set(column for column in node.columns)1050 if not database_columns:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!