Best Python code snippet using pandera_python
model.py
Source:model.py
...58def _is_field(name: str) -> bool:59 """Ignore private and reserved keywords."""60 return not name.startswith("_") and name != _CONFIG_KEY61_config_options = [attr for attr in vars(BaseConfig) if _is_field(attr)]62def _extract_config_options_and_extras(63 config: Any,64) -> Tuple[Dict[str, Any], Dict[str, Any]]:65 config_options, extras = {}, {}66 for name, value in vars(config).items():67 if name in _config_options:68 config_options[name] = value69 elif _is_field(name):70 extras[name] = value71 # drop private/reserved keywords72 return config_options, extras73def _convert_extras_to_checks(extras: Dict[str, Any]) -> List[Check]:74 """75 New in GH#383.76 Any key not in BaseConfig keys is interpreted as defining a dataframe check. This function77 defines this conversion as follows:78 - Look up the key name in Check79 - If value is80 - tuple: interpret as args81 - dict: interpret as kwargs82 - anything else: interpret as the only argument to pass to Check83 """84 checks = []85 for name, value in extras.items():86 if isinstance(value, tuple):87 args, kwargs = value, {}88 elif isinstance(value, dict):89 args, kwargs = (), value90 else:91 args, kwargs = (value,), {}92 # dispatch directly to getattr to raise the correct exception93 checks.append(Check.__getattr__(name)(*args, **kwargs))94 return checks95class _MetaSchema(type):96 """Add string representations, mainly for pydantic."""97 def __repr__(cls):98 return str(cls)99 def __str__(cls):100 return cls.__name__101class SchemaModel(metaclass=_MetaSchema):102 """Definition of a :class:`~pandera.DataFrameSchema`.103 *new in 0.5.0*104 See the :ref:`User Guide <schema_models>` for more.105 """106 Config: Type[BaseConfig] = BaseConfig107 __extras__: Optional[Dict[str, Any]] = None108 __schema__: Optional[DataFrameSchema] = None109 __config__: Optional[Type[BaseConfig]] = None110 #: Key according to `FieldInfo.name`111 __fields__: Dict[str, Tuple[AnnotationInfo, FieldInfo]] = {}112 __checks__: Dict[str, List[Check]] = {}113 __dataframe_checks__: List[Check] = []114 # This is syntantic sugar that delegates to the validate method115 @docstring_substitution(validate_doc=DataFrameSchema.validate.__doc__)116 def __new__(cls, *args, **kwargs) -> DataFrameBase[TSchemaModel]: # type: ignore [misc]117 """%(validate_doc)s"""118 return cast(DataFrameBase[TSchemaModel], cls.validate(*args, **kwargs))119 def __init_subclass__(cls, **kwargs):120 """Ensure :class:`~pandera.model_components.FieldInfo` instances."""121 super().__init_subclass__(**kwargs)122 # pylint:disable=no-member123 subclass_annotations = cls.__dict__.get("__annotations__", {})124 for field_name in subclass_annotations.keys():125 if _is_field(field_name) and field_name not in cls.__dict__:126 # Field omitted127 field = Field()128 field.__set_name__(cls, field_name)129 setattr(cls, field_name, field)130 cls.__config__, cls.__extras__ = cls._collect_config_and_extras()131 @classmethod132 def to_schema(cls) -> DataFrameSchema:133 """Create :class:`~pandera.DataFrameSchema` from the :class:`.SchemaModel`."""134 if cls in MODEL_CACHE:135 return MODEL_CACHE[cls]136 mi_kwargs = {137 name[len("multiindex_") :]: value138 for name, value in vars(cls.__config__).items()139 if name.startswith("multiindex_")140 }141 cls.__fields__ = cls._collect_fields()142 check_infos = typing.cast(143 List[FieldCheckInfo], cls._collect_check_infos(CHECK_KEY)144 )145 cls.__checks__ = cls._extract_checks(146 check_infos, field_names=list(cls.__fields__.keys())147 )148 df_check_infos = cls._collect_check_infos(DATAFRAME_CHECK_KEY)149 df_custom_checks = cls._extract_df_checks(df_check_infos)150 df_registered_checks = _convert_extras_to_checks(151 {} if cls.__extras__ is None else cls.__extras__152 )153 cls.__dataframe_checks__ = df_custom_checks + df_registered_checks154 columns, index = cls._build_columns_index(155 cls.__fields__, cls.__checks__, **mi_kwargs156 )157 kwargs = {}158 if cls.__config__ is not None:159 kwargs = {160 "coerce": cls.__config__.coerce,161 "strict": cls.__config__.strict,162 "name": cls.__config__.name,163 "ordered": cls.__config__.ordered,164 "unique": cls.__config__.unique,165 "title": cls.__config__.title,166 "description": cls.__config__.description or cls.__doc__,167 }168 cls.__schema__ = DataFrameSchema(169 columns,170 index=index,171 checks=cls.__dataframe_checks__, # type: ignore172 **kwargs,173 )174 if cls not in MODEL_CACHE:175 MODEL_CACHE[cls] = cls.__schema__ # type: ignore176 return cls.__schema__ # type: ignore177 @classmethod178 def to_yaml(cls, stream: Optional[os.PathLike] = None):179 """180 Convert `Schema` to yaml using `io.to_yaml`.181 """182 return cls.to_schema().to_yaml(stream)183 @classmethod184 @docstring_substitution(validate_doc=DataFrameSchema.validate.__doc__)185 def validate(186 cls: Type[TSchemaModel],187 check_obj: pd.DataFrame,188 head: Optional[int] = None,189 tail: Optional[int] = None,190 sample: Optional[int] = None,191 random_state: Optional[int] = None,192 lazy: bool = False,193 inplace: bool = False,194 ) -> DataFrameBase[TSchemaModel]:195 """%(validate_doc)s"""196 return cast(197 DataFrameBase[TSchemaModel],198 cls.to_schema().validate(199 check_obj, head, tail, sample, random_state, lazy, inplace200 ),201 )202 @classmethod203 @docstring_substitution(strategy_doc=DataFrameSchema.strategy.__doc__)204 @st.strategy_import_error205 def strategy(cls: Type[TSchemaModel], *, size: Optional[int] = None):206 """%(strategy_doc)s"""207 return cls.to_schema().strategy(size=size)208 @classmethod209 @docstring_substitution(example_doc=DataFrameSchema.strategy.__doc__)210 @st.strategy_import_error211 def example(212 cls: Type[TSchemaModel], *, size: Optional[int] = None213 ) -> DataFrameBase[TSchemaModel]:214 """%(example_doc)s"""215 return cast(216 DataFrameBase[TSchemaModel], cls.to_schema().example(size=size)217 )218 @classmethod219 def _build_columns_index( # pylint:disable=too-many-locals220 cls,221 fields: Dict[str, Tuple[AnnotationInfo, FieldInfo]],222 checks: Dict[str, List[Check]],223 **multiindex_kwargs: Any,224 ) -> Tuple[225 Dict[str, schema_components.Column],226 Optional[Union[schema_components.Index, schema_components.MultiIndex]],227 ]:228 index_count = sum(229 annotation.origin in INDEX_TYPES230 for annotation, _ in fields.values()231 )232 columns: Dict[str, schema_components.Column] = {}233 indices: List[schema_components.Index] = []234 for field_name, (annotation, field) in fields.items():235 field_checks = checks.get(field_name, [])236 field_name = field.name237 check_name = getattr(field, "check_name", None)238 if annotation.metadata:239 if field.dtype_kwargs:240 raise TypeError(241 "Cannot specify redundant 'dtype_kwargs' "242 + f"for {annotation.raw_annotation}."243 + "\n Usage Tip: Drop 'typing.Annotated'."244 )245 dtype_kwargs = _get_dtype_kwargs(annotation)246 dtype = annotation.arg(**dtype_kwargs) # type: ignore247 elif annotation.default_dtype:248 dtype = annotation.default_dtype249 else:250 dtype = annotation.arg251 dtype = None if dtype is Any else dtype252 if (253 annotation.origin in SERIES_TYPES254 or annotation.raw_annotation in SERIES_TYPES255 ):256 col_constructor = (257 field.to_column if field else schema_components.Column258 )259 if check_name is False:260 raise SchemaInitError(261 f"'check_name' is not supported for {field_name}."262 )263 columns[field_name] = col_constructor( # type: ignore264 dtype,265 required=not annotation.optional,266 checks=field_checks,267 name=field_name,268 )269 elif (270 annotation.origin in INDEX_TYPES271 or annotation.raw_annotation in INDEX_TYPES272 ):273 if annotation.optional:274 raise SchemaInitError(275 f"Index '{field_name}' cannot be Optional."276 )277 if check_name is False or (278 # default single index279 check_name is None280 and index_count == 1281 ):282 field_name = None # type:ignore283 index_constructor = (284 field.to_index if field else schema_components.Index285 )286 index = index_constructor( # type: ignore287 dtype, checks=field_checks, name=field_name288 )289 indices.append(index)290 else:291 raise SchemaInitError(292 f"Invalid annotation '{field_name}: "293 f"{annotation.raw_annotation}'"294 )295 return columns, _build_schema_index(indices, **multiindex_kwargs)296 @classmethod297 def _get_model_attrs(cls) -> Dict[str, Any]:298 """Return all attributes.299 Similar to inspect.get_members but bypass descriptors __get__.300 """301 bases = inspect.getmro(cls)[:-1] # bases -> SchemaModel -> object302 attrs = {}303 for base in reversed(bases):304 attrs.update(base.__dict__)305 return attrs306 @classmethod307 def _collect_fields(cls) -> Dict[str, Tuple[AnnotationInfo, FieldInfo]]:308 """Centralize publicly named fields and their corresponding annotations."""309 annotations = get_type_hints( # pylint:disable=unexpected-keyword-arg310 cls, include_extras=True311 )312 attrs = cls._get_model_attrs()313 missing = []314 for name, attr in attrs.items():315 if inspect.isroutine(attr):316 continue317 if not _is_field(name):318 annotations.pop(name, None)319 elif name not in annotations:320 missing.append(name)321 if missing:322 raise SchemaInitError(f"Found missing annotations: {missing}")323 fields = {}324 for field_name, annotation in annotations.items():325 field = attrs[field_name] # __init_subclass__ guarantees existence326 if not isinstance(field, FieldInfo):327 raise SchemaInitError(328 f"'{field_name}' can only be assigned a 'Field', "329 + f"not a '{type(field)}.'"330 )331 fields[field.name] = (AnnotationInfo(annotation), field)332 return fields333 @classmethod334 def _collect_config_and_extras(335 cls,336 ) -> Tuple[Type[BaseConfig], Dict[str, Any]]:337 """Collect config options from bases, splitting off unknown options."""338 bases = inspect.getmro(cls)[:-1]339 bases = typing.cast(Tuple[Type[SchemaModel]], bases)340 root_model, *models = reversed(bases)341 options, extras = _extract_config_options_and_extras(root_model.Config)342 for model in models:343 config = getattr(model, _CONFIG_KEY, {})344 base_options, base_extras = _extract_config_options_and_extras(345 config346 )347 options.update(base_options)348 extras.update(base_extras)349 return type("Config", (BaseConfig,), options), extras350 @classmethod351 def _collect_check_infos(cls, key: str) -> List[CheckInfo]:352 """Collect inherited check metadata from bases.353 Inherited classmethods are not in cls.__dict__, that's why we need to354 walk the inheritance tree.355 """356 bases = inspect.getmro(cls)[:-2] # bases -> SchemaModel -> object357 bases = typing.cast(Tuple[Type[SchemaModel]], bases)358 method_names = set()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!