Best Python code snippet using pandera_python
schemas.py
Source:schemas.py
...651 )652 except errors.SchemaError as err:653 error_handler.collect_error("dataframe_check", err)654 if self.unique:655 keep_setting = convert_uniquesettings(self._report_duplicates)656 # NOTE: fix this pylint error657 # pylint: disable=not-an-iterable658 temp_unique: List[List] = (659 [self.unique]660 if all(isinstance(x, str) for x in self.unique)661 else self.unique662 )663 for lst in temp_unique:664 duplicates = df_to_validate.duplicated(665 subset=lst, keep=keep_setting666 )667 if duplicates.any():668 # NOTE: this is a hack to support pyspark.pandas, need to669 # figure out a workaround to error: "Cannot combine the670 # series or dataframe because it comes from a different671 # dataframe."672 if type(duplicates).__module__.startswith(673 "pyspark.pandas"674 ):675 # pylint: disable=import-outside-toplevel676 import pyspark.pandas as ps677 with ps.option_context(678 "compute.ops_on_diff_frames", True679 ):680 failure_cases = df_to_validate.loc[duplicates, lst]681 else:682 failure_cases = df_to_validate.loc[duplicates, lst]683 failure_cases = reshape_failure_cases(failure_cases)684 error_handler.collect_error(685 "duplicates",686 errors.SchemaError(687 self,688 check_obj,689 f"columns '{*lst,}' not unique:\n{failure_cases}",690 failure_cases=failure_cases,691 check="multiple_fields_uniqueness",692 ),693 )694 if lazy and error_handler.collected_errors:695 raise errors.SchemaErrors(696 self, error_handler.collected_errors, check_obj697 )698 assert all(check_results), "all check results must be True."699 return check_obj700 def __call__(701 self,702 dataframe: pd.DataFrame,703 head: Optional[int] = None,704 tail: Optional[int] = None,705 sample: Optional[int] = None,706 random_state: Optional[int] = None,707 lazy: bool = False,708 inplace: bool = False,709 ):710 """Alias for :func:`DataFrameSchema.validate` method.711 :param pd.DataFrame dataframe: the dataframe to be validated.712 :param head: validate the first n rows. Rows overlapping with `tail` or713 `sample` are de-duplicated.714 :type head: int715 :param tail: validate the last n rows. Rows overlapping with `head` or716 `sample` are de-duplicated.717 :type tail: int718 :param sample: validate a random sample of n rows. Rows overlapping719 with `head` or `tail` are de-duplicated.720 :param random_state: random seed for the ``sample`` argument.721 :param lazy: if True, lazily evaluates dataframe against all validation722 checks and raises a ``SchemaErrors``. Otherwise, raise723 ``SchemaError`` as soon as one occurs.724 :param inplace: if True, applies coercion to the object of validation,725 otherwise creates a copy of the data.726 """727 return self.validate(728 dataframe, head, tail, sample, random_state, lazy, inplace729 )730 def __repr__(self) -> str:731 """Represent string for logging."""732 return (733 f"<Schema {self.__class__.__name__}("734 f"columns={self.columns}, "735 f"checks={self.checks}, "736 f"index={self.index.__repr__()}, "737 f"coerce={self.coerce}, "738 f"dtype={self._dtype}, "739 f"strict={self.strict}, "740 f"name={self.name}, "741 f"ordered={self.ordered}, "742 f"unique_column_names={self.unique_column_names}"743 ")>"744 )745 def __str__(self) -> str:746 """Represent string for user inspection."""747 def _format_multiline(json_str, arg):748 return "\n".join(749 f"{indent}{line}" if i != 0 else f"{indent}{arg}={line}"750 for i, line in enumerate(json_str.split("\n"))751 )752 indent = " " * N_INDENT_SPACES753 if self.columns:754 columns_str = f"{indent}columns={{\n"755 for colname, col in self.columns.items():756 columns_str += f"{indent * 2}'{colname}': {col}\n"757 columns_str += f"{indent}}}"758 else:759 columns_str = f"{indent}columns={{}}"760 if self.checks:761 checks_str = f"{indent}checks=[\n"762 for check in self.checks:763 checks_str += f"{indent * 2}{check}\n"764 checks_str += f"{indent}]"765 else:766 checks_str = f"{indent}checks=[]"767 # add additional indents768 index_ = str(self.index).split("\n")769 if len(index_) == 1:770 index = str(self.index)771 else:772 index = "\n".join(773 x if i == 0 else f"{indent}{x}" for i, x in enumerate(index_)774 )775 return (776 f"<Schema {self.__class__.__name__}(\n"777 f"{columns_str},\n"778 f"{checks_str},\n"779 f"{indent}coerce={self.coerce},\n"780 f"{indent}dtype={self._dtype},\n"781 f"{indent}index={index},\n"782 f"{indent}strict={self.strict}\n"783 f"{indent}name={self.name},\n"784 f"{indent}ordered={self.ordered},\n"785 f"{indent}unique_column_names={self.unique_column_names}\n"786 ")>"787 )788 def __eq__(self, other: object) -> bool:789 if not isinstance(other, type(self)):790 return NotImplemented791 def _compare_dict(obj):792 return {793 k: v for k, v in obj.__dict__.items() if k != "_IS_INFERRED"794 }795 return _compare_dict(self) == _compare_dict(other)796 @st.strategy_import_error797 def strategy(798 self, *, size: Optional[int] = None, n_regex_columns: int = 1799 ):800 """Create a ``hypothesis`` strategy for generating a DataFrame.801 :param size: number of elements to generate802 :param n_regex_columns: number of regex columns to generate.803 :returns: a strategy that generates pandas DataFrame objects.804 """805 return st.dataframe_strategy(806 self.dtype,807 columns=self.columns,808 checks=self.checks,809 unique=self.unique,810 index=self.index,811 size=size,812 n_regex_columns=n_regex_columns,813 )814 def example(815 self, size: Optional[int] = None, n_regex_columns: int = 1816 ) -> pd.DataFrame:817 """Generate an example of a particular size.818 :param size: number of elements in the generated DataFrame.819 :returns: pandas DataFrame object.820 """821 # pylint: disable=import-outside-toplevel,cyclic-import,import-error822 import hypothesis823 with warnings.catch_warnings():824 warnings.simplefilter(825 "ignore",826 category=hypothesis.errors.NonInteractiveExampleWarning,827 )828 return self.strategy(829 size=size, n_regex_columns=n_regex_columns830 ).example()831 @_inferred_schema_guard832 def add_columns(self, extra_schema_cols: Dict[str, Any]) -> Self:833 """Create a copy of the :class:`DataFrameSchema` with extra columns.834 :param extra_schema_cols: Additional columns of the format835 :type extra_schema_cols: DataFrameSchema836 :returns: a new :class:`DataFrameSchema` with the extra_schema_cols837 added.838 :example:839 To add columns to the schema, pass a dictionary with column name and840 ``Column`` instance key-value pairs.841 >>> import pandera as pa842 >>>843 >>> example_schema = pa.DataFrameSchema(844 ... {845 ... "category": pa.Column(str),846 ... "probability": pa.Column(float),847 ... }848 ... )849 >>> print(850 ... example_schema.add_columns({"even_number": pa.Column(pa.Bool)})851 ... )852 <Schema DataFrameSchema(853 columns={854 'category': <Schema Column(name=category, type=DataType(str))>855 'probability': <Schema Column(name=probability, type=DataType(float64))>856 'even_number': <Schema Column(name=even_number, type=DataType(bool))>857 },858 checks=[],859 coerce=False,860 dtype=None,861 index=None,862 strict=False863 name=None,864 ordered=False,865 unique_column_names=False866 )>867 .. seealso:: :func:`remove_columns`868 """869 schema_copy = copy.deepcopy(self)870 schema_copy.columns = {871 **schema_copy.columns,872 **self.__class__(extra_schema_cols).columns,873 }874 return schema_copy875 @_inferred_schema_guard876 def remove_columns(self, cols_to_remove: List[str]) -> Self:877 """Removes columns from a :class:`DataFrameSchema` and returns a new878 copy.879 :param cols_to_remove: Columns to be removed from the880 ``DataFrameSchema``881 :type cols_to_remove: List882 :returns: a new :class:`DataFrameSchema` without the cols_to_remove883 :raises: :class:`~pandera.errors.SchemaInitError`: if column not in884 schema.885 :example:886 To remove a column or set of columns from a schema, pass a list of887 columns to be removed:888 >>> import pandera as pa889 >>>890 >>> example_schema = pa.DataFrameSchema(891 ... {892 ... "category" : pa.Column(str),893 ... "probability": pa.Column(float)894 ... }895 ... )896 >>>897 >>> print(example_schema.remove_columns(["category"]))898 <Schema DataFrameSchema(899 columns={900 'probability': <Schema Column(name=probability, type=DataType(float64))>901 },902 checks=[],903 coerce=False,904 dtype=None,905 index=None,906 strict=False907 name=None,908 ordered=False,909 unique_column_names=False910 )>911 .. seealso:: :func:`add_columns`912 """913 schema_copy = copy.deepcopy(self)914 # ensure all specified keys are present in the columns915 not_in_cols: List[str] = [916 x for x in cols_to_remove if x not in schema_copy.columns.keys()917 ]918 if not_in_cols:919 raise errors.SchemaInitError(920 f"Keys {not_in_cols} not found in schema columns!"921 )922 for col in cols_to_remove:923 schema_copy.columns.pop(col)924 return schema_copy925 @_inferred_schema_guard926 def update_column(self, column_name: str, **kwargs) -> Self:927 """Create copy of a :class:`DataFrameSchema` with updated column928 properties.929 :param column_name:930 :param kwargs: key-word arguments supplied to931 :class:`~pandera.schema_components.Column`932 :returns: a new :class:`DataFrameSchema` with updated column933 :raises: :class:`~pandera.errors.SchemaInitError`: if column not in934 schema or you try to change the name.935 :example:936 Calling ``schema.1`` returns the :class:`DataFrameSchema`937 with the updated column.938 >>> import pandera as pa939 >>>940 >>> example_schema = pa.DataFrameSchema({941 ... "category" : pa.Column(str),942 ... "probability": pa.Column(float)943 ... })944 >>> print(945 ... example_schema.update_column(946 ... 'category', dtype=pa.Category947 ... )948 ... )949 <Schema DataFrameSchema(950 columns={951 'category': <Schema Column(name=category, type=DataType(category))>952 'probability': <Schema Column(name=probability, type=DataType(float64))>953 },954 checks=[],955 coerce=False,956 dtype=None,957 index=None,958 strict=False959 name=None,960 ordered=False,961 unique_column_names=False962 )>963 .. seealso:: :func:`rename_columns`964 """965 # check that columns exist in schema966 if "name" in kwargs:967 raise ValueError("cannot update 'name' of the column.")968 if column_name not in self.columns:969 raise ValueError(f"column '{column_name}' not in {self}")970 schema_copy = copy.deepcopy(self)971 column_copy = copy.deepcopy(self.columns[column_name])972 new_column = column_copy.__class__(973 **{**column_copy.properties, **kwargs}974 )975 schema_copy.columns.update({column_name: new_column})976 return schema_copy977 def update_columns(self, update_dict: Dict[str, Dict[str, Any]]) -> Self:978 """979 Create copy of a :class:`DataFrameSchema` with updated column980 properties.981 :param update_dict:982 :return: a new :class:`DataFrameSchema` with updated columns983 :raises: :class:`~pandera.errors.SchemaInitError`: if column not in984 schema or you try to change the name.985 :example:986 Calling ``schema.update_columns`` returns the :class:`DataFrameSchema`987 with the updated columns.988 >>> import pandera as pa989 >>>990 >>> example_schema = pa.DataFrameSchema({991 ... "category" : pa.Column(str),992 ... "probability": pa.Column(float)993 ... })994 >>>995 >>> print(996 ... example_schema.update_columns(997 ... {"category": {"dtype":pa.Category}}998 ... )999 ... )1000 <Schema DataFrameSchema(1001 columns={1002 'category': <Schema Column(name=category, type=DataType(category))>1003 'probability': <Schema Column(name=probability, type=DataType(float64))>1004 },1005 checks=[],1006 coerce=False,1007 dtype=None,1008 index=None,1009 strict=False1010 name=None,1011 ordered=False,1012 unique_column_names=False1013 )>1014 """1015 new_schema = copy.deepcopy(self)1016 # ensure all specified keys are present in the columns1017 not_in_cols: List[str] = [1018 x for x in update_dict.keys() if x not in new_schema.columns.keys()1019 ]1020 if not_in_cols:1021 raise errors.SchemaInitError(1022 f"Keys {not_in_cols} not found in schema columns!"1023 )1024 new_columns: Dict[str, Column] = {}1025 for col in new_schema.columns:1026 # check1027 if update_dict.get(col):1028 if update_dict[col].get("name"):1029 raise errors.SchemaInitError(1030 "cannot update 'name' \1031 property of the column."1032 )1033 original_properties = new_schema.columns[col].properties1034 if update_dict.get(col):1035 new_properties = copy.deepcopy(original_properties)1036 new_properties.update(update_dict[col])1037 new_columns[col] = new_schema.columns[col].__class__(1038 **new_properties1039 )1040 else:1041 new_columns[col] = new_schema.columns[col].__class__(1042 **original_properties1043 )1044 new_schema.columns = new_columns1045 return new_schema1046 def rename_columns(self, rename_dict: Dict[str, str]) -> Self:1047 """Rename columns using a dictionary of key-value pairs.1048 :param rename_dict: dictionary of 'old_name': 'new_name' key-value1049 pairs.1050 :returns: :class:`DataFrameSchema` (copy of original)1051 :raises: :class:`~pandera.errors.SchemaInitError` if column not in the1052 schema.1053 :example:1054 To rename a column or set of columns, pass a dictionary of old column1055 names and new column names, similar to the pandas DataFrame method.1056 >>> import pandera as pa1057 >>>1058 >>> example_schema = pa.DataFrameSchema({1059 ... "category" : pa.Column(str),1060 ... "probability": pa.Column(float)1061 ... })1062 >>>1063 >>> print(1064 ... example_schema.rename_columns({1065 ... "category": "categories",1066 ... "probability": "probabilities"1067 ... })1068 ... )1069 <Schema DataFrameSchema(1070 columns={1071 'categories': <Schema Column(name=categories, type=DataType(str))>1072 'probabilities': <Schema Column(name=probabilities, type=DataType(float64))>1073 },1074 checks=[],1075 coerce=False,1076 dtype=None,1077 index=None,1078 strict=False1079 name=None,1080 ordered=False,1081 unique_column_names=False1082 )>1083 .. seealso:: :func:`update_column`1084 """1085 new_schema = copy.deepcopy(self)1086 # ensure all specified keys are present in the columns1087 not_in_cols: List[str] = [1088 x for x in rename_dict.keys() if x not in new_schema.columns.keys()1089 ]1090 if not_in_cols:1091 raise errors.SchemaInitError(1092 f"Keys {not_in_cols} not found in schema columns!"1093 )1094 # remove any mapping to itself as this is a no-op1095 rename_dict = {k: v for k, v in rename_dict.items() if k != v}1096 # ensure all new keys are not present in the current column names1097 already_in_columns: List[str] = [1098 x for x in rename_dict.values() if x in new_schema.columns.keys()1099 ]1100 if already_in_columns:1101 raise errors.SchemaInitError(1102 f"Keys {already_in_columns} already found in schema columns!"1103 )1104 # We iterate over the existing columns dict and replace those keys1105 # that exist in the rename_dict1106 new_columns = {1107 (rename_dict[col_name] if col_name in rename_dict else col_name): (1108 col_attrs.set_name(rename_dict[col_name])1109 if col_name in rename_dict1110 else col_attrs1111 )1112 for col_name, col_attrs in new_schema.columns.items()1113 }1114 new_schema.columns = new_columns1115 return new_schema1116 def select_columns(self, columns: List[Any]) -> Self:1117 """Select subset of columns in the schema.1118 *New in version 0.4.5*1119 :param columns: list of column names to select.1120 :returns: :class:`DataFrameSchema` (copy of original) with only1121 the selected columns.1122 :raises: :class:`~pandera.errors.SchemaInitError` if column not in the1123 schema.1124 :example:1125 To subset a schema by column, and return a new schema:1126 >>> import pandera as pa1127 >>>1128 >>> example_schema = pa.DataFrameSchema({1129 ... "category" : pa.Column(str),1130 ... "probability": pa.Column(float)1131 ... })1132 >>>1133 >>> print(example_schema.select_columns(['category']))1134 <Schema DataFrameSchema(1135 columns={1136 'category': <Schema Column(name=category, type=DataType(str))>1137 },1138 checks=[],1139 coerce=False,1140 dtype=None,1141 index=None,1142 strict=False1143 name=None,1144 ordered=False,1145 unique_column_names=False1146 )>1147 .. note:: If an index is present in the schema, it will also be1148 included in the new schema.1149 """1150 new_schema = copy.deepcopy(self)1151 # ensure all specified keys are present in the columns1152 not_in_cols: List[str] = [1153 x for x in columns if x not in new_schema.columns.keys()1154 ]1155 if not_in_cols:1156 raise errors.SchemaInitError(1157 f"Keys {not_in_cols} not found in schema columns!"1158 )1159 new_columns = {1160 col_name: column1161 for col_name, column in self.columns.items()1162 if col_name in columns1163 }1164 new_schema.columns = new_columns1165 return new_schema1166 def to_script(self, fp: Union[str, Path] = None) -> "DataFrameSchema":1167 """Create DataFrameSchema from yaml file.1168 :param path: str, Path to write script1169 :returns: dataframe schema.1170 """1171 # pylint: disable=import-outside-toplevel,cyclic-import1172 import pandera.io1173 return pandera.io.to_script(self, fp)1174 @classmethod1175 def from_yaml(cls, yaml_schema) -> "DataFrameSchema":1176 """Create DataFrameSchema from yaml file.1177 :param yaml_schema: str, Path to yaml schema, or serialized yaml1178 string.1179 :returns: dataframe schema.1180 """1181 # pylint: disable=import-outside-toplevel,cyclic-import1182 import pandera.io1183 return pandera.io.from_yaml(yaml_schema)1184 @overload1185 def to_yaml(self, stream: None = None) -> str: # pragma: no cover1186 ...1187 @overload1188 def to_yaml(self, stream: os.PathLike) -> None: # pragma: no cover1189 ...1190 def to_yaml(self, stream: Optional[os.PathLike] = None) -> Optional[str]:1191 """Write DataFrameSchema to yaml file.1192 :param stream: file path or stream to write to. If None, dumps1193 to string.1194 :returns: yaml string if stream is None, otherwise returns None.1195 """1196 # pylint: disable=import-outside-toplevel,cyclic-import1197 import pandera.io1198 return pandera.io.to_yaml(self, stream)1199 @classmethod1200 def from_json(cls, source) -> "DataFrameSchema":1201 """Create DataFrameSchema from json file.1202 :param source: str, Path to json schema, or serialized yaml1203 string.1204 :returns: dataframe schema.1205 """1206 # pylint: disable=import-outside-toplevel,cyclic-import1207 import pandera.io1208 return pandera.io.from_json(source)1209 @overload1210 def to_json(1211 self, target: None = None, **kwargs1212 ) -> str: # pragma: no cover1213 ...1214 @overload1215 def to_json(1216 self, target: os.PathLike, **kwargs1217 ) -> None: # pragma: no cover1218 ...1219 def to_json(1220 self, target: Optional[os.PathLike] = None, **kwargs1221 ) -> Optional[str]:1222 """Write DataFrameSchema to json file.1223 :param target: file target to write to. If None, dumps to string.1224 :returns: json string if target is None, otherwise returns None.1225 """1226 # pylint: disable=import-outside-toplevel,cyclic-import1227 import pandera.io1228 return pandera.io.to_json(self, target, **kwargs)1229 def set_index(1230 self, keys: List[str], drop: bool = True, append: bool = False1231 ) -> Self:1232 """1233 A method for setting the :class:`Index` of a :class:`DataFrameSchema`,1234 via an existing :class:`Column` or list of columns.1235 :param keys: list of labels1236 :param drop: bool, default True1237 :param append: bool, default False1238 :return: a new :class:`DataFrameSchema` with specified column(s) in the1239 index.1240 :raises: :class:`~pandera.errors.SchemaInitError` if column not in the1241 schema.1242 :examples:1243 Just as you would set the index in a ``pandas`` DataFrame from an1244 existing column, you can set an index within the schema from an1245 existing column in the schema.1246 >>> import pandera as pa1247 >>>1248 >>> example_schema = pa.DataFrameSchema({1249 ... "category" : pa.Column(str),1250 ... "probability": pa.Column(float)})1251 >>>1252 >>> print(example_schema.set_index(['category']))1253 <Schema DataFrameSchema(1254 columns={1255 'probability': <Schema Column(name=probability, type=DataType(float64))>1256 },1257 checks=[],1258 coerce=False,1259 dtype=None,1260 index=<Schema Index(name=category, type=DataType(str))>,1261 strict=False1262 name=None,1263 ordered=False,1264 unique_column_names=False1265 )>1266 If you have an existing index in your schema, and you would like to1267 append a new column as an index to it (yielding a :class:`Multiindex`),1268 just use set_index as you would in pandas.1269 >>> example_schema = pa.DataFrameSchema(1270 ... {1271 ... "column1": pa.Column(str),1272 ... "column2": pa.Column(int)1273 ... },1274 ... index=pa.Index(name = "column3", dtype = int)1275 ... )1276 >>>1277 >>> print(example_schema.set_index(["column2"], append = True))1278 <Schema DataFrameSchema(1279 columns={1280 'column1': <Schema Column(name=column1, type=DataType(str))>1281 },1282 checks=[],1283 coerce=False,1284 dtype=None,1285 index=<Schema MultiIndex(1286 indexes=[1287 <Schema Index(name=column3, type=DataType(int64))>1288 <Schema Index(name=column2, type=DataType(int64))>1289 ]1290 coerce=False,1291 strict=False,1292 name=None,1293 ordered=True1294 )>,1295 strict=False1296 name=None,1297 ordered=False,1298 unique_column_names=False1299 )>1300 .. seealso:: :func:`reset_index`1301 """1302 # pylint: disable=import-outside-toplevel,cyclic-import1303 from pandera.schema_components import Index, MultiIndex1304 new_schema = copy.deepcopy(self)1305 keys_temp: List = (1306 list(set(keys)) if not isinstance(keys, list) else keys1307 )1308 # ensure all specified keys are present in the columns1309 not_in_cols: List[str] = [1310 x for x in keys_temp if x not in new_schema.columns.keys()1311 ]1312 if not_in_cols:1313 raise errors.SchemaInitError(1314 f"Keys {not_in_cols} not found in schema columns!"1315 )1316 # if there is already an index, append or replace according to1317 # parameters1318 ind_list: List = (1319 []1320 if new_schema.index is None or not append1321 else list(new_schema.index.indexes)1322 if isinstance(new_schema.index, MultiIndex) and append1323 else [new_schema.index]1324 )1325 for col in keys_temp:1326 ind_list.append(1327 Index(1328 dtype=new_schema.columns[col].dtype,1329 name=col,1330 checks=new_schema.columns[col].checks,1331 nullable=new_schema.columns[col].nullable,1332 unique=new_schema.columns[col].unique,1333 coerce=new_schema.columns[col].coerce,1334 )1335 )1336 new_schema.index = (1337 ind_list[0] if len(ind_list) == 1 else MultiIndex(ind_list)1338 )1339 # if drop is True as defaulted, drop the columns moved into the index1340 if drop:1341 new_schema = new_schema.remove_columns(keys_temp)1342 return new_schema1343 def reset_index(self, level: List[str] = None, drop: bool = False) -> Self:1344 """1345 A method for resetting the :class:`Index` of a :class:`DataFrameSchema`1346 :param level: list of labels1347 :param drop: bool, default False1348 :return: a new :class:`DataFrameSchema` with specified column(s) in the1349 index.1350 :raises: :class:`~pandera.errors.SchemaInitError` if no index set in1351 schema.1352 :examples:1353 Similar to the ``pandas`` reset_index method on a pandas DataFrame,1354 this method can be used to to fully or partially reset indices of a1355 schema.1356 To remove the entire index from the schema, just call the reset_index1357 method with default parameters.1358 >>> import pandera as pa1359 >>>1360 >>> example_schema = pa.DataFrameSchema(1361 ... {"probability" : pa.Column(float)},1362 ... index = pa.Index(name="unique_id", dtype=int)1363 ... )1364 >>>1365 >>> print(example_schema.reset_index())1366 <Schema DataFrameSchema(1367 columns={1368 'probability': <Schema Column(name=probability, type=DataType(float64))>1369 'unique_id': <Schema Column(name=unique_id, type=DataType(int64))>1370 },1371 checks=[],1372 coerce=False,1373 dtype=None,1374 index=None,1375 strict=False1376 name=None,1377 ordered=False,1378 unique_column_names=False1379 )>1380 This reclassifies an index (or indices) as a column (or columns).1381 Similarly, to partially alter the index, pass the name of the column1382 you would like to be removed to the ``level`` parameter, and you may1383 also decide whether to drop the levels with the ``drop`` parameter.1384 >>> example_schema = pa.DataFrameSchema({1385 ... "category" : pa.Column(str)},1386 ... index = pa.MultiIndex([1387 ... pa.Index(name="unique_id1", dtype=int),1388 ... pa.Index(name="unique_id2", dtype=str)1389 ... ]1390 ... )1391 ... )1392 >>> print(example_schema.reset_index(level = ["unique_id1"]))1393 <Schema DataFrameSchema(1394 columns={1395 'category': <Schema Column(name=category, type=DataType(str))>1396 'unique_id1': <Schema Column(name=unique_id1, type=DataType(int64))>1397 },1398 checks=[],1399 coerce=False,1400 dtype=None,1401 index=<Schema Index(name=unique_id2, type=DataType(str))>,1402 strict=False1403 name=None,1404 ordered=False,1405 unique_column_names=False1406 )>1407 .. seealso:: :func:`set_index`1408 """1409 # pylint: disable=import-outside-toplevel,cyclic-import1410 from pandera.schema_components import Column, Index, MultiIndex1411 # explcit check for an empty list1412 if level == []:1413 return self1414 new_schema = copy.deepcopy(self)1415 if new_schema.index is None:1416 raise errors.SchemaInitError(1417 "There is currently no index set for this schema."1418 )1419 # ensure no duplicates1420 level_temp: Union[List[Any], List[str]] = (1421 new_schema.index.names if level is None else list(set(level))1422 )1423 # ensure all specified keys are present in the index1424 level_not_in_index: Union[List[Any], List[str], None] = (1425 [x for x in level_temp if x not in new_schema.index.names]1426 if isinstance(new_schema.index, MultiIndex) and level_temp1427 else []1428 if isinstance(new_schema.index, Index)1429 and (level_temp == [new_schema.index.name])1430 else level_temp1431 )1432 if level_not_in_index:1433 raise errors.SchemaInitError(1434 f"Keys {level_not_in_index} not found in schema columns!"1435 )1436 new_index = (1437 None1438 if not level_temp or isinstance(new_schema.index, Index)1439 else new_schema.index.remove_columns(level_temp)1440 )1441 new_index = (1442 new_index1443 if new_index is None1444 else Index(1445 dtype=new_index.columns[list(new_index.columns)[0]].dtype,1446 checks=new_index.columns[list(new_index.columns)[0]].checks,1447 nullable=new_index.columns[1448 list(new_index.columns)[0]1449 ].nullable,1450 unique=new_index.columns[list(new_index.columns)[0]].unique,1451 coerce=new_index.columns[list(new_index.columns)[0]].coerce,1452 name=new_index.columns[list(new_index.columns)[0]].name,1453 )1454 if (len(list(new_index.columns)) == 1) and (new_index is not None)1455 else None1456 if (len(list(new_index.columns)) == 0) and (new_index is not None)1457 else new_index1458 )1459 if not drop:1460 additional_columns: Dict[str, Any] = (1461 {col: new_schema.index.columns.get(col) for col in level_temp}1462 if isinstance(new_schema.index, MultiIndex)1463 else {new_schema.index.name: new_schema.index}1464 )1465 new_schema = new_schema.add_columns(1466 {1467 k: Column(1468 dtype=v.dtype,1469 checks=v.checks,1470 nullable=v.nullable,1471 unique=v.unique,1472 coerce=v.coerce,1473 name=v.name,1474 )1475 for (k, v) in additional_columns.items()1476 }1477 )1478 new_schema.index = new_index1479 return new_schema1480 @classmethod1481 def __get_validators__(cls):1482 yield cls._pydantic_validate1483 @classmethod1484 def _pydantic_validate(cls, schema: Any) -> "DataFrameSchema":1485 """Verify that the input is a compatible DataFrameSchema."""1486 if not isinstance(schema, cls): # type: ignore1487 raise TypeError(f"{schema} is not a {cls}.")1488 return cast("DataFrameSchema", schema)1489class SeriesSchemaBase:1490 """Base series validator object."""1491 def __init__(1492 self,1493 dtype: PandasDtypeInputTypes = None,1494 checks: CheckList = None,1495 nullable: bool = False,1496 unique: bool = False,1497 report_duplicates: UniqueSettings = "all",1498 coerce: bool = False,1499 name: Any = None,1500 title: Optional[str] = None,1501 description: Optional[str] = None,1502 ) -> None:1503 """Initialize series schema base object.1504 :param dtype: datatype of the column. If a string is specified,1505 then assumes one of the valid pandas string values:1506 http://pandas.pydata.org/pandas-docs/stable/basics.html#dtypes1507 :param checks: If element_wise is True, then callable signature should1508 be:1509 ``Callable[Any, bool]`` where the ``Any`` input is a scalar element1510 in the column. Otherwise, the input is assumed to be a1511 pandas.Series object.1512 :param nullable: Whether or not column can contain null values.1513 :param unique: whether column values should be unique.1514 :param report_duplicates: how to report unique errors1515 - `exclude_first`: report all duplicates except first occurence1516 - `exclude_last`: report all duplicates except last occurence1517 - `all`: (default) report all duplicates1518 :param coerce: If True, when schema.validate is called the column will1519 be coerced into the specified dtype. This has no effect on columns1520 where ``dtype=None``.1521 :param name: column name in dataframe to validate.1522 :param title: A human-readable label for the series.1523 :param description: An arbitrary textual description of the series.1524 :type nullable: bool1525 """1526 if checks is None:1527 checks = []1528 if isinstance(checks, (Check, Hypothesis)):1529 checks = [checks]1530 self.dtype = dtype # type: ignore1531 self._nullable = nullable1532 self._coerce = coerce1533 self._checks = checks1534 self._name = name1535 self._unique = unique1536 self._report_duplicates = report_duplicates1537 self._title = title1538 self._description = description1539 for check in self.checks:1540 if check.groupby is not None and not self._allow_groupby:1541 raise errors.SchemaInitError(1542 f"Cannot use groupby checks with type {type(self)}"1543 )1544 # make sure pandas dtype is valid1545 self.dtype # pylint: disable=pointless-statement1546 # this attribute is not meant to be accessed by users and is explicitly1547 # set to True in the case that a schema is created by infer_schema.1548 self._IS_INFERRED = False1549 if isinstance(self.dtype, pandas_engine.PydanticModel):1550 raise errors.SchemaInitError(1551 "PydanticModel dtype can only be specified as a "1552 "DataFrameSchema dtype."1553 )1554 # the _is_inferred getter and setter methods are not public1555 @property1556 def _is_inferred(self):1557 return self._IS_INFERRED1558 @_is_inferred.setter1559 def _is_inferred(self, value: bool):1560 self._IS_INFERRED = value1561 @property1562 def checks(self):1563 """Return list of checks or hypotheses."""1564 return self._checks1565 @checks.setter1566 def checks(self, checks):1567 self._checks = checks1568 @_inferred_schema_guard1569 def set_checks(self, checks: CheckList):1570 """Create a new SeriesSchema with a new set of Checks1571 :param checks: checks to set on the new schema1572 :returns: a new SeriesSchema with a new set of checks1573 """1574 schema_copy = copy.deepcopy(self)1575 schema_copy.checks = checks1576 return schema_copy1577 @property1578 def nullable(self) -> bool:1579 """Whether the series is nullable."""1580 return self._nullable1581 @property1582 def unique(self) -> bool:1583 """Whether to check for duplicates in check object"""1584 return self._unique1585 @unique.setter1586 def unique(self, value: bool) -> None:1587 """Set unique attribute"""1588 self._unique = value1589 @property1590 def coerce(self) -> bool:1591 """Whether to coerce series to specified type."""1592 return self._coerce1593 @coerce.setter1594 def coerce(self, value: bool) -> None:1595 """Set coerce attribute."""1596 self._coerce = value1597 @property1598 def name(self) -> Union[str, None]:1599 """Get SeriesSchema name."""1600 return self._name1601 @property1602 def title(self):1603 """A human-readable label for the series."""1604 return self._title1605 @property1606 def description(self):1607 """An arbitrary textual description of the series."""1608 return self._description1609 @property1610 def dtype(1611 self,1612 ) -> DataType:1613 """Get the pandas dtype"""1614 return self._dtype # type: ignore1615 @dtype.setter1616 def dtype(self, value: PandasDtypeInputTypes) -> None:1617 """Set the pandas dtype"""1618 self._dtype = pandas_engine.Engine.dtype(value) if value else None1619 def coerce_dtype(self, obj: Union[pd.Series, pd.Index]) -> pd.Series:1620 """Coerce type of a pd.Series by type specified in dtype.1621 :param pd.Series series: One-dimensional ndarray with axis labels1622 (including time series).1623 :returns: ``Series`` with coerced data type1624 """1625 if self.dtype is None:1626 return obj1627 try:1628 return self.dtype.try_coerce(obj)1629 except errors.ParserError as exc:1630 msg = (1631 f"Error while coercing '{self.name}' to type "1632 f"{self.dtype}: {exc}:\n{exc.failure_cases}"1633 )1634 raise errors.SchemaError(1635 self,1636 obj,1637 msg,1638 failure_cases=exc.failure_cases,1639 check=f"coerce_dtype('{self.dtype}')",1640 ) from exc1641 @property1642 def _allow_groupby(self):1643 """Whether the schema or schema component allows groupby operations."""1644 raise NotImplementedError( # pragma: no cover1645 "The _allow_groupby property must be implemented by subclasses "1646 "of SeriesSchemaBase"1647 )1648 def validate(1649 self,1650 check_obj: Union[pd.DataFrame, pd.Series],1651 head: Optional[int] = None,1652 tail: Optional[int] = None,1653 sample: Optional[int] = None,1654 random_state: Optional[int] = None,1655 lazy: bool = False,1656 inplace: bool = False,1657 ) -> Union[pd.DataFrame, pd.Series]:1658 # pylint: disable=too-many-locals,too-many-branches,too-many-statements1659 """Validate a series or specific column in dataframe.1660 :check_obj: pandas DataFrame or Series to validate.1661 :param head: validate the first n rows. Rows overlapping with `tail` or1662 `sample` are de-duplicated.1663 :param tail: validate the last n rows. Rows overlapping with `head` or1664 `sample` are de-duplicated.1665 :param sample: validate a random sample of n rows. Rows overlapping1666 with `head` or `tail` are de-duplicated.1667 :param random_state: random seed for the ``sample`` argument.1668 :param lazy: if True, lazily evaluates dataframe against all validation1669 checks and raises a ``SchemaErrors``. Otherwise, raise1670 ``SchemaError`` as soon as one occurs.1671 :param inplace: if True, applies coercion to the object of validation,1672 otherwise creates a copy of the data.1673 :returns: validated DataFrame or Series.1674 """1675 if self._is_inferred:1676 warnings.warn(1677 f"This {type(self)} is an inferred schema that hasn't been "1678 "modified. It's recommended that you refine the schema "1679 "by calling `set_checks` before using it to validate data.",1680 UserWarning,1681 )1682 error_handler = SchemaErrorHandler(lazy)1683 if not inplace:1684 check_obj = check_obj.copy()1685 series = (1686 check_obj1687 if check_utils.is_field(check_obj)1688 else check_obj[self.name]1689 )1690 series = _pandas_obj_to_validate(1691 series, head, tail, sample, random_state1692 )1693 check_obj = _pandas_obj_to_validate(1694 check_obj, head, tail, sample, random_state1695 )1696 if self.name is not None and series.name != self._name:1697 msg = (1698 f"Expected {type(self)} to have name '{self._name}', found "1699 f"'{series.name}'"1700 )1701 error_handler.collect_error(1702 "wrong_field_name",1703 errors.SchemaError(1704 self,1705 check_obj,1706 msg,1707 failure_cases=scalar_failure_case(series.name),1708 check=f"field_name('{self._name}')",1709 ),1710 )1711 if not self._nullable:1712 nulls = series.isna()1713 if nulls.sum() > 0:1714 failed = series[nulls]1715 msg = (1716 f"non-nullable series '{series.name}' contains null "1717 f"values:\n{failed}"1718 )1719 error_handler.collect_error(1720 "series_contains_nulls",1721 errors.SchemaError(1722 self,1723 check_obj,1724 msg,1725 failure_cases=reshape_failure_cases(1726 series[nulls], ignore_na=False1727 ),1728 check="not_nullable",1729 ),1730 )1731 # Check if the series contains duplicate values1732 if self._unique:1733 keep_argument = convert_uniquesettings(self._report_duplicates)1734 if type(series).__module__.startswith("pyspark.pandas"):1735 duplicates = (1736 series.to_frame()1737 .duplicated(keep=keep_argument)1738 .reindex(series.index)1739 )1740 # pylint: disable=import-outside-toplevel1741 import pyspark.pandas as ps1742 with ps.option_context("compute.ops_on_diff_frames", True):1743 failed = series[duplicates]1744 else:1745 duplicates = series.duplicated(keep=keep_argument)1746 failed = series[duplicates]1747 if duplicates.any():1748 msg = (1749 f"series '{series.name}' contains duplicate values:\n"1750 f"{failed}"1751 )1752 error_handler.collect_error(1753 "series_contains_duplicates",1754 errors.SchemaError(1755 self,1756 check_obj,1757 msg,1758 failure_cases=reshape_failure_cases(failed),1759 check="field_uniqueness",1760 ),1761 )1762 if self._dtype is not None:1763 failure_cases = None1764 check_output = self._dtype.check(1765 pandas_engine.Engine.dtype(series.dtype), series1766 )1767 if check_output is False:1768 failure_cases = scalar_failure_case(str(series.dtype))1769 msg = (1770 f"expected series '{series.name}' to have type {self._dtype}, "1771 + f"got {series.dtype}"1772 )1773 elif not isinstance(check_output, bool):1774 _, failure_cases = check_utils.prepare_series_check_output(1775 series,1776 pd.Series(list(check_output))1777 if not isinstance(check_output, pd.Series)1778 else check_output,1779 )1780 failure_cases = reshape_failure_cases(failure_cases)1781 msg = (1782 f"expected series '{series.name}' to have type {self._dtype}:\n"1783 f"failure cases:\n{failure_cases}"1784 )1785 if failure_cases is not None and not failure_cases.empty:1786 error_handler.collect_error(1787 "wrong_dtype",1788 errors.SchemaError(1789 self,1790 check_obj,1791 msg,1792 failure_cases=failure_cases,1793 check=f"dtype('{self.dtype}')",1794 ),1795 )1796 check_results = []1797 if check_utils.is_field(check_obj):1798 check_obj, check_args = series, [None]1799 else:1800 check_args = [self.name] # type: ignore1801 for check_index, check in enumerate(self.checks):1802 try:1803 check_results.append(1804 _handle_check_results(1805 self, check_index, check, check_obj, *check_args1806 )1807 )1808 except errors.SchemaError as err:1809 error_handler.collect_error("dataframe_check", err)1810 except Exception as err: # pylint: disable=broad-except1811 # catch other exceptions that may occur when executing the1812 # Check1813 err_msg = f'"{err.args[0]}"' if len(err.args) > 0 else ""1814 err_str = f"{err.__class__.__name__}({ err_msg})"1815 msg = (1816 f"Error while executing check function: {err_str}\n"1817 + traceback.format_exc()1818 )1819 error_handler.collect_error(1820 "check_error",1821 errors.SchemaError(1822 self,1823 check_obj,1824 msg,1825 failure_cases=scalar_failure_case(err_str),1826 check=check,1827 check_index=check_index,1828 ),1829 original_exc=err,1830 )1831 if lazy and error_handler.collected_errors:1832 raise errors.SchemaErrors(1833 self, error_handler.collected_errors, check_obj1834 )1835 assert all(check_results)1836 return check_obj1837 def __call__(1838 self,1839 check_obj: Union[pd.DataFrame, pd.Series],1840 head: Optional[int] = None,1841 tail: Optional[int] = None,1842 sample: Optional[int] = None,1843 random_state: Optional[int] = None,1844 lazy: bool = False,1845 inplace: bool = False,1846 ) -> Union[pd.DataFrame, pd.Series]:1847 """Alias for ``validate`` method."""1848 return self.validate(1849 check_obj, head, tail, sample, random_state, lazy, inplace1850 )1851 def __eq__(self, other):1852 return self.__dict__ == other.__dict__1853 @st.strategy_import_error1854 def strategy(self, *, size=None):1855 """Create a ``hypothesis`` strategy for generating a Series.1856 :param size: number of elements to generate1857 :returns: a strategy that generates pandas Series objects.1858 """1859 return st.series_strategy(1860 self.dtype,1861 checks=self.checks,1862 nullable=self.nullable,1863 unique=self.unique,1864 name=self.name,1865 size=size,1866 )1867 def example(self, size=None) -> pd.Series:1868 """Generate an example of a particular size.1869 :param size: number of elements in the generated Series.1870 :returns: pandas Series object.1871 """1872 # pylint: disable=import-outside-toplevel,cyclic-import,import-error1873 import hypothesis1874 with warnings.catch_warnings():1875 warnings.simplefilter(1876 "ignore",1877 category=hypothesis.errors.NonInteractiveExampleWarning,1878 )1879 return self.strategy(size=size).example()1880 def __repr__(self):1881 return (1882 f"<Schema {self.__class__.__name__}"1883 f"(name={self._name}, type={self.dtype!r})>"1884 )1885 @classmethod1886 def __get_validators__(cls):1887 yield cls._pydantic_validate1888 @classmethod1889 def _pydantic_validate( # type: ignore1890 cls: TSeriesSchemaBase, schema: Any1891 ) -> TSeriesSchemaBase:1892 """Verify that the input is a compatible DataFrameSchema."""1893 if not isinstance(schema, cls): # type: ignore1894 raise TypeError(f"{schema} is not a {cls}.")1895 return cast(TSeriesSchemaBase, schema)1896class SeriesSchema(SeriesSchemaBase):1897 """Series validator."""1898 def __init__(1899 self,1900 dtype: PandasDtypeInputTypes = None,1901 checks: CheckList = None,1902 index=None,1903 nullable: bool = False,1904 unique: bool = False,1905 report_duplicates: UniqueSettings = "all",1906 coerce: bool = False,1907 name: str = None,1908 title: Optional[str] = None,1909 description: Optional[str] = None,1910 ) -> None:1911 """Initialize series schema base object.1912 :param dtype: datatype of the column. If a string is specified,1913 then assumes one of the valid pandas string values:1914 http://pandas.pydata.org/pandas-docs/stable/basics.html#dtypes1915 :param checks: If element_wise is True, then callable signature should1916 be:1917 ``Callable[Any, bool]`` where the ``Any`` input is a scalar element1918 in the column. Otherwise, the input is assumed to be a1919 pandas.Series object.1920 :param index: specify the datatypes and properties of the index.1921 :param nullable: Whether or not column can contain null values.1922 :param unique: whether column values should be unique.1923 :param report_duplicates: how to report unique errors1924 - `exclude_first`: report all duplicates except first occurence1925 - `exclude_last`: report all duplicates except last occurence1926 - `all`: (default) report all duplicates1927 :param coerce: If True, when schema.validate is called the column will1928 be coerced into the specified dtype. This has no effect on columns1929 where ``dtype=None``.1930 :param name: series name.1931 :param title: A human-readable label for the series.1932 :param description: An arbitrary textual description of the series.1933 """1934 super().__init__(1935 dtype,1936 checks,1937 nullable,1938 unique,1939 report_duplicates,1940 coerce,1941 name,1942 title,1943 description,1944 )1945 self.index = index1946 @property1947 def _allow_groupby(self) -> bool:1948 """Whether the schema or schema component allows groupby operations."""1949 return False1950 def validate(1951 self,1952 check_obj: pd.Series,1953 head: Optional[int] = None,1954 tail: Optional[int] = None,1955 sample: Optional[int] = None,1956 random_state: Optional[int] = None,1957 lazy: bool = False,1958 inplace: bool = False,1959 ) -> pd.Series:1960 """Validate a Series object.1961 :param check_obj: One-dimensional ndarray with axis labels1962 (including time series).1963 :param head: validate the first n rows. Rows overlapping with `tail` or1964 `sample` are de-duplicated.1965 :param tail: validate the last n rows. Rows overlapping with `head` or1966 `sample` are de-duplicated.1967 :param sample: validate a random sample of n rows. Rows overlapping1968 with `head` or `tail` are de-duplicated.1969 :param random_state: random seed for the ``sample`` argument.1970 :param lazy: if True, lazily evaluates dataframe against all validation1971 checks and raises a ``SchemaErrors``. Otherwise, raise1972 ``SchemaError`` as soon as one occurs.1973 :param inplace: if True, applies coercion to the object of validation,1974 otherwise creates a copy of the data.1975 :returns: validated Series.1976 :raises SchemaError: when ``DataFrame`` violates built-in or custom1977 checks.1978 :example:1979 >>> import pandas as pd1980 >>> import pandera as pa1981 >>>1982 >>> series_schema = pa.SeriesSchema(1983 ... float, [1984 ... pa.Check(lambda s: s > 0),1985 ... pa.Check(lambda s: s < 1000),1986 ... pa.Check(lambda s: s.mean() > 300),1987 ... ])1988 >>> series = pd.Series([1, 100, 800, 900, 999], dtype=float)1989 >>> print(series_schema.validate(series))1990 0 1.01991 1 100.01992 2 800.01993 3 900.01994 4 999.01995 dtype: float641996 """1997 if not check_utils.is_field(check_obj):1998 raise TypeError(f"expected pd.Series, got {type(check_obj)}")1999 if hasattr(check_obj, "dask"):2000 # special case for dask series2001 if inplace:2002 check_obj = check_obj.pandera.add_schema(self)2003 else:2004 check_obj = check_obj.copy()2005 check_obj = check_obj.map_partitions(2006 self._validate,2007 head=head,2008 tail=tail,2009 sample=sample,2010 random_state=random_state,2011 lazy=lazy,2012 inplace=inplace,2013 meta=check_obj,2014 )2015 return check_obj.pandera.add_schema(self)2016 return self._validate(2017 check_obj=check_obj,2018 head=head,2019 tail=tail,2020 sample=sample,2021 random_state=random_state,2022 lazy=lazy,2023 inplace=inplace,2024 )2025 def _validate(2026 self,2027 check_obj: pd.Series,2028 head: Optional[int] = None,2029 tail: Optional[int] = None,2030 sample: Optional[int] = None,2031 random_state: Optional[int] = None,2032 lazy: bool = False,2033 inplace: bool = False,2034 ) -> pd.Series:2035 if not inplace:2036 check_obj = check_obj.copy()2037 if hasattr(check_obj, "pandera"):2038 check_obj = check_obj.pandera.add_schema(self)2039 error_handler = SchemaErrorHandler(lazy=lazy)2040 if self.coerce:2041 try:2042 check_obj = self.coerce_dtype(check_obj)2043 if hasattr(check_obj, "pandera"):2044 check_obj = check_obj.pandera.add_schema(self)2045 except errors.SchemaError as exc:2046 error_handler.collect_error("dtype_coercion_error", exc)2047 # validate index2048 if self.index:2049 # coerce data type using index schema copy to prevent mutation2050 # of original index schema attribute.2051 _index = copy.deepcopy(self.index)2052 _index.coerce = _index.coerce or self.coerce2053 try:2054 check_obj = _index(2055 check_obj, head, tail, sample, random_state, lazy, inplace2056 )2057 except errors.SchemaError as exc:2058 error_handler.collect_error("dtype_coercion_error", exc)2059 except errors.SchemaErrors as err:2060 for schema_error_dict in err.schema_errors:2061 error_handler.collect_error(2062 "index_check", schema_error_dict["error"]2063 )2064 # validate series2065 try:2066 super().validate(2067 check_obj, head, tail, sample, random_state, lazy, inplace2068 )2069 except errors.SchemaErrors as err:2070 for schema_error_dict in err.schema_errors:2071 error_handler.collect_error(2072 "series_check", schema_error_dict["error"]2073 )2074 if error_handler.collected_errors:2075 raise errors.SchemaErrors(2076 self, error_handler.collected_errors, check_obj2077 )2078 return check_obj2079 def __call__(2080 self,2081 check_obj: pd.Series,2082 head: Optional[int] = None,2083 tail: Optional[int] = None,2084 sample: Optional[int] = None,2085 random_state: Optional[int] = None,2086 lazy: bool = False,2087 inplace: bool = False,2088 ) -> pd.Series:2089 """Alias for :func:`SeriesSchema.validate` method."""2090 return self.validate(2091 check_obj, head, tail, sample, random_state, lazy, inplace2092 )2093 def __eq__(self, other):2094 return self.__dict__ == other.__dict__2095def _pandas_obj_to_validate(2096 dataframe_or_series: Union[pd.DataFrame, pd.Series],2097 head: Optional[int],2098 tail: Optional[int],2099 sample: Optional[int],2100 random_state: Optional[int],2101) -> Union[pd.DataFrame, pd.Series]:2102 pandas_obj_subsample = []2103 if head is not None:2104 pandas_obj_subsample.append(dataframe_or_series.head(head))2105 if tail is not None:2106 pandas_obj_subsample.append(dataframe_or_series.tail(tail))2107 if sample is not None:2108 pandas_obj_subsample.append(2109 dataframe_or_series.sample(sample, random_state=random_state)2110 )2111 return (2112 dataframe_or_series2113 if not pandas_obj_subsample2114 else pd.concat(pandas_obj_subsample).pipe(2115 lambda x: x[~x.index.duplicated()]2116 )2117 )2118def _handle_check_results(2119 schema: Union[DataFrameSchema, SeriesSchemaBase],2120 check_index: int,2121 check: Union[Check, Hypothesis],2122 check_obj: Union[pd.DataFrame, pd.Series],2123 *check_args,2124) -> bool:2125 """Handle check results, raising SchemaError on check failure.2126 :param check_index: index of check in the schema component check list.2127 :param check: Check object used to validate pandas object.2128 :param check_args: arguments to pass into check object.2129 :returns: True if check results pass or check.raise_warning=True, otherwise2130 False.2131 """2132 check_result = check(check_obj, *check_args)2133 if not check_result.check_passed:2134 if check_result.failure_cases is None:2135 # encode scalar False values explicitly2136 failure_cases = scalar_failure_case(check_result.check_passed)2137 error_msg = format_generic_error_message(2138 schema, check, check_index2139 )2140 else:2141 failure_cases = reshape_failure_cases(2142 check_result.failure_cases, check.ignore_na2143 )2144 error_msg = format_vectorized_error_message(2145 schema, check, check_index, failure_cases2146 )2147 # raise a warning without exiting if the check is specified to do so2148 if check.raise_warning:2149 warnings.warn(error_msg, UserWarning)2150 return True2151 raise errors.SchemaError(2152 schema,2153 check_obj,2154 error_msg,2155 failure_cases=failure_cases,2156 check=check,2157 check_index=check_index,2158 check_output=check_result.check_output,2159 )2160 return check_result.check_passed2161def convert_uniquesettings(unique: UniqueSettings) -> Union[bool, str]:2162 """2163 Converts UniqueSettings object to string that can be passed onto pandas .duplicated() call2164 """2165 # Default `keep` argument for pandas .duplicated() function2166 keep_argument: Union[bool, str]2167 if unique == "exclude_first":2168 keep_argument = "first"2169 elif unique == "exclude_last":2170 keep_argument = "last"2171 elif unique == "all":2172 keep_argument = False2173 else:2174 raise ValueError(2175 str(unique) + " is not a recognized report_duplicates value"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!