Best Python code snippet using pandera_python
pipeline_utils.py
Source:pipeline_utils.py
1import os2from enum import Enum3PipelineStep = Enum("PipelineStep", "DATA_VALIDATION PREPROCESSING MODELING DEPLOY")4def create_pipeline_file(step, project, train_steps=1000, eval_steps=500, columns_for_slicing=None):5 6 base_file = """7import os8import logging9import datetime10from tfx.orchestration.airflow.airflow_runner import AirflowDAGRunner11from tfx.orchestration.pipeline import PipelineDecorator12from tfx.utils.dsl_utils import csv_input13from tfx.proto import trainer_pb2, evaluator_pb2, pusher_pb214from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen15from tfx.components.statistics_gen.component import StatisticsGen16from tfx.components.schema_gen.component import SchemaGen17from tfx.components.example_validator.component import ExampleValidator18from tfx.components.transform.component import Transform19from tfx.components.trainer.component import Trainer20from tfx.components.evaluator.component import Evaluator21from tfx.components.model_validator.component import ModelValidator22from tfx.components.pusher.component import Pusher23data_dir = os.path.join(os.environ['DATA_DIR'], '{project}')24log_dir = os.path.join(os.environ['TFX_DIR'], 'logs')25serving_model_dir = os.path.join(os.environ['SERVING_DIR'], 'serving_model', '{project}')26project_preprocessing_file = os.path.join(os.environ['DAGS_DIR'], '{project}_preprocessing.py')27project_training_file = os.path.join(os.environ['DAGS_DIR'], '{project}_modeling.py')28logger_overrides = dict([29 ('log_root', log_dir),30 ('log_level', logging.INFO)31])32airflow_config = dict([33 ('schedule_interval', None),34 ('start_date', datetime.datetime(2019, 1, 1))35])36@PipelineDecorator(37 pipeline_name='{project}',38 enable_cache=True,39 metadata_db_root=os.environ['METADATA_DB_DIR'],40 additional_pipeline_args=dict([('logger_args', logger_overrides)]),41 pipeline_root=os.environ['PIPELINE_DIR']42)43def create_pipeline():44 45 pipeline = []46 47 {components}48 49 return pipeline50pipeline = AirflowDAGRunner(airflow_config).run(create_pipeline())51"""52 53 pipeline_file = None54 if step == PipelineStep.DATA_VALIDATION:55 56 pipeline_file = base_file.format(57 project=project,58 components=""" 59 examples = csv_input(data_dir)60 # Brings data into the pipeline61 example_gen = CsvExampleGen(input_base=examples)62 pipeline.append(example_gen)63 64 # Computes statistics over data for visualization and example validation.65 statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)66 pipeline.append(statistics_gen)67 # Generates schema based on statistics files.68 infer_schema = SchemaGen(stats=statistics_gen.outputs.output)69 pipeline.append(infer_schema)70 # Performs anomaly detection based on statistics and data schema.71 validate_stats = ExampleValidator(72 stats=statistics_gen.outputs.output,73 schema=infer_schema.outputs.output74 )75 pipeline.append(validate_stats)76 """77 )78 elif step == PipelineStep.PREPROCESSING:79 80 pipeline_file = base_file.format(81 project=project,82 components=""" 83 examples = csv_input(data_dir)84 # Brings data into the pipeline85 example_gen = CsvExampleGen(input_base=examples)86 pipeline.append(example_gen)87 88 # Computes statistics over data for visualization and example validation.89 statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)90 pipeline.append(statistics_gen)91 # Generates schema based on statistics files.92 infer_schema = SchemaGen(stats=statistics_gen.outputs.output)93 pipeline.append(infer_schema)94 # Performs anomaly detection based on statistics and data schema.95 validate_stats = ExampleValidator(96 stats=statistics_gen.outputs.output,97 schema=infer_schema.outputs.output98 )99 pipeline.append(validate_stats)100 # Performs transformations and feature engineering in training and serving.101 transform = Transform(102 input_data=example_gen.outputs.examples,103 schema=infer_schema.outputs.output,104 module_file=project_preprocessing_file105 )106 pipeline.append(transform)107 """108 )109 elif step == PipelineStep.MODELING:110 111 pipeline_file = base_file.format(112 project=project,113 components=""" 114 examples = csv_input(data_dir)115 # Brings data into the pipeline116 example_gen = CsvExampleGen(input_base=examples)117 pipeline.append(example_gen)118 119 # Computes statistics over data for visualization and example validation.120 statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)121 pipeline.append(statistics_gen)122 # Generates schema based on statistics files.123 infer_schema = SchemaGen(stats=statistics_gen.outputs.output)124 pipeline.append(infer_schema)125 # Performs anomaly detection based on statistics and data schema.126 validate_stats = ExampleValidator(127 stats=statistics_gen.outputs.output,128 schema=infer_schema.outputs.output129 )130 pipeline.append(validate_stats)131 # Performs transformations and feature engineering in training and serving.132 transform = Transform(133 input_data=example_gen.outputs.examples,134 schema=infer_schema.outputs.output,135 module_file=project_preprocessing_file136 )137 pipeline.append(transform)138 # Uses user-provided Python function that implements a model.139 trainer = Trainer(140 module_file=project_training_file,141 transformed_examples=transform.outputs.transformed_examples,142 schema=infer_schema.outputs.output,143 transform_output=transform.outputs.transform_output,144 train_args=trainer_pb2.TrainArgs(num_steps={train_steps}),145 eval_args=trainer_pb2.EvalArgs(num_steps={eval_steps})146 )147 pipeline.append(trainer)148 # Uses TFMA to compute a evaluation statistics over features of a model.149 model_analyzer = Evaluator(150 examples=example_gen.outputs.examples,151 model_exports=trainer.outputs.output,152 feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[{specs}])153 )154 pipeline.append(model_analyzer)155 """.format(156 train_steps=train_steps,157 eval_steps=eval_steps,158 specs=",".join(["evaluator_pb2.SingleSlicingSpec(column_for_slicing=['%s'])" % col for col in columns_for_slicing])159 ))160 elif step == PipelineStep.DEPLOY:161 162 pipeline_file = base_file.format(163 project=project,164 components=""" 165 examples = csv_input(data_dir)166 # Brings data into the pipeline167 example_gen = CsvExampleGen(input_base=examples)168 pipeline.append(example_gen)169 170 # Computes statistics over data for visualization and example validation.171 statistics_gen = StatisticsGen(input_data=example_gen.outputs.examples)172 pipeline.append(statistics_gen)173 # Generates schema based on statistics files.174 infer_schema = SchemaGen(stats=statistics_gen.outputs.output)175 pipeline.append(infer_schema)176 # Performs anomaly detection based on statistics and data schema.177 validate_stats = ExampleValidator(178 stats=statistics_gen.outputs.output,179 schema=infer_schema.outputs.output180 )181 pipeline.append(validate_stats)182 # Performs transformations and feature engineering in training and serving.183 transform = Transform(184 input_data=example_gen.outputs.examples,185 schema=infer_schema.outputs.output,186 module_file=project_preprocessing_file187 )188 pipeline.append(transform)189 # Uses user-provided Python function that implements a model.190 trainer = Trainer(191 module_file=project_training_file,192 transformed_examples=transform.outputs.transformed_examples,193 schema=infer_schema.outputs.output,194 transform_output=transform.outputs.transform_output,195 train_args=trainer_pb2.TrainArgs(num_steps={train_steps}),196 eval_args=trainer_pb2.EvalArgs(num_steps={eval_steps})197 )198 pipeline.append(trainer)199 # Uses TFMA to compute a evaluation statistics over features of a model.200 model_analyzer = Evaluator(201 examples=example_gen.outputs.examples,202 model_exports=trainer.outputs.output,203 feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[{specs}])204 )205 pipeline.append(model_analyzer)206 # Performs quality validation of a candidate model (compared to a baseline).207 model_validator = ModelValidator(208 examples=example_gen.outputs.examples, model=trainer.outputs.output209 )210 pipeline.append(model_validator)211 # Checks whether the model passed the validation steps and pushes the model to a file destination if check passed.212 pusher = Pusher(213 model_export=trainer.outputs.output,214 model_blessing=model_validator.outputs.blessing,215 push_destination=pusher_pb2.PushDestination(216 filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=serving_model_dir)217 )218 )219 pipeline.append(pusher)220 """.format(221 train_steps=train_steps,222 eval_steps=eval_steps,223 specs=",".join(["evaluator_pb2.SingleSlicingSpec(column_for_slicing=['%s'])" % col for col in columns_for_slicing])224 ))225 226 227 return pipeline_file228 229def write_pipeline_to_dags(pipeline_file, name):230 with open(os.path.join(os.environ['DAGS_DIR'], name + ".py"), "w") as f:231 f.write(pipeline_file)...
test_infer_schema.py
Source:test_infer_schema.py
...6 col1 = pd.Series(["1", "4", "9", "20", "8"], dtype="string", name="col1")7 col2 = pd.Series(["1", "4", "9", "20", "8"], name="col2")8 col3 = pd.Series(["1", "4", "9", "20", "8"], dtype="category", name="col3")9 col4 = pd.Series([True, False, False, True], name="col4")10 assert "col1" in infer_schema(col1.to_frame()).categorical_columns11 assert "col2" in infer_schema(col2.to_frame()).categorical_columns12 assert "col3" in infer_schema(col3.to_frame()).categorical_columns13 assert "col4" in infer_schema(col4.to_frame()).categorical_columns14def test_valid_numerical_column():15 col1 = pd.Series([1, 2, 3, 4], name="col1")16 assert "col1" in infer_schema(col1.to_frame()).numerical_columns17def test_valid_text_column():18 col1 = pd.Series([f"{x}" for x in range(21)], dtype="string", name="col1")19 col2 = pd.Series([f"{x}" for x in range(21)], name="col2")20 assert "col1" in infer_schema(col1.to_frame()).text_columns21 assert "col2" in infer_schema(col2.to_frame()).text_columns22def test_valid_time_column():23 col1 = pd.to_datetime(24 pd.Series(["3/11/2000", "3/12/2000", "3/13/2000"], name="col1")25 )26 col2 = pd.Series(27 [datetime(2022, 9, 13, 9), pd.Timestamp(1513393355.5, unit="s")], name="col2"28 )29 assert "col1" in infer_schema(col1.to_frame()).time_columns30 assert "col2" in infer_schema(col2.to_frame()).time_columns31def test_non_supported_mixed_column():32 col1 = pd.Series([1, 2, 3, 4, "a"], name="col1")33 col2 = pd.Series([1, 2, 3, 4, datetime(2013, 1, 1)], name="col1")34 with pytest.raises(35 Exception, match=r"Mixed data type mixed-integer is not supported"36 ):37 infer_schema(col1.to_frame())38 infer_schema(col2.to_frame())39def test_non_supported_other_column():40 col1 = pd.Series(pd.Timedelta(timedelta(days=1, seconds=1)))41 col2 = pd.Series(pd.Interval(left=0, right=5))42 col3 = pd.Series(pd.Period("2017-01-01"))43 with pytest.raises(44 Exception, match=r"Data type timedelta64\[ns\] is not supported"45 ):46 infer_schema(col1.to_frame())47 with pytest.raises(Exception, match=r"Data type interval.* is not supported"):48 infer_schema(col2.to_frame())49 with pytest.raises(Exception, match=r"Data type period.* is not supported"):...
python-dict-list-to-dataframe.py
Source:python-dict-list-to-dataframe.py
...15 {"Category": 'Category C', 'ItemID': 3, 'Amount': 100.01},16 {"Category": 'Category A', 'ItemID': 4, 'Amount': 110.01},17 {"Category": 'Category B', 'ItemID': 5, 'Amount': 70.85}18 ]19def infer_schema():20 # Create data frame21 df = spark.createDataFrame(data)22 print(df.schema)23 df.show()24def explicit_schema():25 # Create a schema for the dataframe26 schema = StructType([27 StructField('Category', StringType(), False),28 StructField('ItemID', IntegerType(), False),29 StructField('Amount', FloatType(), True)30 ])31 # Create data frame32 df = spark.createDataFrame(data, schema)33 print(df.schema)34 df.show()35if __name__ == "__main__":36 infer_schema()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!