Best Python code snippet using hypothesis
test_arrow.py
Source:test_arrow.py
1# Copyright 2017 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may not4# use this file except in compliance with the License. You may obtain a copy of5# the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations under13# the License.14# ==============================================================================15"""Tests for ArrowDataset."""16from collections import namedtuple17import io18import os19import socket20import tempfile21import threading22import pytest23import pyarrow as pa24import numpy.testing as npt25import tensorflow as tf26import tensorflow_io as tfio27TruthData = namedtuple("TruthData", ["data", "output_types", "output_shapes"])28class ArrowTestBase(tf.test.TestCase):29 """ArrowTestBase"""30 @classmethod31 def setUpClass(cls): # pylint: disable=invalid-name32 """setUpClass"""33 cls.scalar_data = [34 [True, False, True, True],35 [1, 2, -3, 4],36 [1, 2, -3, 4],37 [1, 2, -3, 4],38 [1, 2, -3, 4],39 [1, 2, 3, 4],40 [1, 2, 3, 4],41 [1, 2, 3, 4],42 [1, 2, 3, 4],43 [1.1, 2.2, 3.3, 4.4],44 [1.1, 2.2, 3.3, 4.4],45 ]46 cls.scalar_dtypes = (47 tf.dtypes.bool,48 tf.dtypes.int8,49 tf.dtypes.int16,50 tf.dtypes.int32,51 tf.dtypes.int64,52 tf.dtypes.uint8,53 tf.dtypes.uint16,54 tf.dtypes.uint32,55 tf.dtypes.uint64,56 tf.dtypes.float32,57 tf.dtypes.float64,58 )59 cls.scalar_shapes = tuple(tf.TensorShape([]) for _ in cls.scalar_dtypes)60 cls.list_fixed_data = [61 [[1, 1], [2, 2], [3, 3], [4, 4]],62 [[1, 1], [2, 2], [3, 3], [4, 4]],63 [[1.1, 1.1], [2.2, 2.2], [3.3, 3.3], [4.4, 4.4]],64 [[1.1, 1.1], [2.2, 2.2], [3.3, 3.3], [4.4, 4.4]],65 ]66 cls.list_fixed_dtypes = (67 tf.dtypes.int32,68 tf.dtypes.int64,69 tf.dtypes.float32,70 tf.dtypes.float64,71 )72 cls.list_fixed_shapes = tuple(73 tf.TensorShape([None]) for _ in cls.list_fixed_dtypes74 )75 cls.list_var_data = [76 [[1], [2, 2], [3, 3, 3], [4, 4, 4]],77 [[1.1], [2.2, 2.2], [3.3, 3.3, 3.3], [4.4, 4.4, 4.4]],78 ]79 cls.list_var_dtypes = (tf.dtypes.int32, tf.dtypes.float32)80 cls.list_var_shapes = (tf.TensorShape([None]), tf.TensorShape([None]))81 cls.list_data = cls.list_fixed_data + cls.list_var_data82 cls.list_dtypes = cls.list_fixed_dtypes + cls.list_var_dtypes83 cls.list_shapes = cls.list_fixed_shapes + cls.list_var_shapes84 def get_arrow_type(self, dt, is_list):85 """get_arrow_type"""86 if dt == tf.dtypes.bool:87 arrow_type = pa.bool_()88 elif dt == tf.dtypes.int8:89 arrow_type = pa.int8()90 elif dt == tf.dtypes.int16:91 arrow_type = pa.int16()92 elif dt == tf.dtypes.int32:93 arrow_type = pa.int32()94 elif dt == tf.dtypes.int64:95 arrow_type = pa.int64()96 elif dt == tf.dtypes.uint8:97 arrow_type = pa.uint8()98 elif dt == tf.dtypes.uint16:99 arrow_type = pa.uint16()100 elif dt == tf.dtypes.uint32:101 arrow_type = pa.uint32()102 elif dt == tf.dtypes.uint64:103 arrow_type = pa.uint64()104 elif dt == tf.dtypes.float16:105 arrow_type = pa.float16()106 elif dt == tf.dtypes.float32:107 arrow_type = pa.float32()108 elif dt == tf.dtypes.float64:109 arrow_type = pa.float64()110 elif dt == tf.dtypes.string:111 arrow_type = pa.string()112 else:113 raise TypeError("Unsupported dtype for Arrow" + str(dt))114 if is_list:115 arrow_type = pa.list_(arrow_type)116 return arrow_type117 def make_record_batch(self, truth_data):118 """Make an Arrow RecordBatch for given test data"""119 arrays = [120 pa.array(121 truth_data.data[col],122 type=self.get_arrow_type(123 truth_data.output_types[col],124 isinstance(truth_data.data[col][0], list),125 ),126 )127 for col in range(len(truth_data.output_types))128 ]129 names = [f"{i}_[{a.type}]" for i, a in enumerate(arrays)]130 return pa.RecordBatch.from_arrays(arrays, names)131class ArrowIOTensorTest(ArrowTestBase):132 """ArrowIOTensorTest"""133 @classmethod134 def setUpClass(cls): # pylint: disable=invalid-name135 """setUpClass"""136 super().setUpClass()137 cls.scalar_shapes = tuple(tf.TensorShape([len(c)]) for c in cls.scalar_data)138 cls.list_fixed_shapes = tuple(139 tf.TensorShape([len(c), len(c[0])]) for c in cls.list_fixed_data140 )141 def make_table(self, truth_data):142 """make_table"""143 batch = self.make_record_batch(truth_data)144 return pa.Table.from_batches([batch])145 def run_test_case(self, iot, truth_data, columns):146 """run_test_case"""147 self.assertEqual(iot.columns, columns)148 for i, column in enumerate(columns):149 iot_col = iot(column)150 self.assertEqual(iot_col.dtype, truth_data.output_types[i])151 self.assertEqual(iot_col.shape, truth_data.output_shapes[i])152 npt.assert_almost_equal(iot_col.to_tensor().numpy(), truth_data.data[i])153 def test_arrow_io_tensor_scalar(self):154 """test_arrow_io_tensor_scalar"""155 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)156 table = self.make_table(truth_data)157 iot = tfio.IOTensor.from_arrow(table)158 self.run_test_case(iot, truth_data, table.column_names)159 def test_arrow_io_tensor_lists(self):160 """test_arrow_io_tensor_lists"""161 truth_data = TruthData(162 self.list_fixed_data, self.list_fixed_dtypes, self.list_fixed_shapes163 )164 table = self.make_table(truth_data)165 iot = tfio.IOTensor.from_arrow(table)166 self.run_test_case(iot, truth_data, table.column_names)167 def test_arrow_io_tensor_mixed(self):168 """test_arrow_io_tensor_mixed"""169 truth_data = TruthData(170 self.scalar_data + self.list_fixed_data,171 self.scalar_dtypes + self.list_fixed_dtypes,172 self.scalar_shapes + self.list_fixed_shapes,173 )174 table = self.make_table(truth_data)175 iot = tfio.IOTensor.from_arrow(table)176 self.run_test_case(iot, truth_data, table.column_names)177 def test_arrow_io_tensor_chunked(self):178 """test_arrow_io_tensor_chunked"""179 num_chunks = 2180 chunk_data = TruthData(181 self.scalar_data + self.list_fixed_data,182 self.scalar_dtypes + self.list_fixed_dtypes,183 self.scalar_shapes + self.list_fixed_shapes,184 )185 # Make a table with double the data for 2 chunks186 table = self.make_table(chunk_data)187 table = pa.concat_tables([table] * num_chunks)188 # Double the batch size of the truth data189 output_shapes = self.scalar_shapes + self.list_fixed_shapes190 output_shapes = [191 tf.TensorShape([d + d if i == 0 else d for i, d in enumerate(shape)])192 for shape in output_shapes193 ]194 truth_data = TruthData(195 [d * num_chunks for d in chunk_data.data],196 self.scalar_dtypes + self.list_fixed_dtypes,197 output_shapes,198 )199 self.assertGreater(table[0].num_chunks, 1)200 iot = tfio.IOTensor.from_arrow(table)201 self.run_test_case(iot, truth_data, table.column_names)202 def test_arrow_io_dataset_map_from_file(self):203 """test_arrow_io_dataset_map_from_file"""204 column = "a"205 dtype = tf.dtypes.int64206 column_dtype = self.get_arrow_type(dtype, False)207 arr = pa.array(list(range(100)), column_dtype)208 table = pa.Table.from_arrays([arr], [column])209 spec = {column: dtype}210 with tempfile.NamedTemporaryFile(delete=False) as f:211 with pa.RecordBatchFileWriter(f.name, table.schema) as writer:212 for batch in table.to_batches():213 writer.write_batch(batch)214 def from_file(_):215 reader = pa.RecordBatchFileReader(f.name)216 t = reader.read_all()217 tio = tfio.IOTensor.from_arrow(t, spec=spec)218 return tio(column).to_tensor()219 num_iters = 2220 ds = tf.data.Dataset.range(num_iters).map(from_file)221 expected = table[column].to_pylist()222 iter_count = 0223 for result in ds:224 npt.assert_array_equal(result, expected)225 iter_count += 1226 self.assertEqual(iter_count, num_iters)227 os.unlink(f.name)228 def test_arrow_io_dataset_map_py_func(self):229 """test_arrow_io_dataset_map_from_py_func"""230 column = "a"231 dtype = tf.dtypes.int64232 column_dtype = self.get_arrow_type(dtype, False)233 arr = pa.array(list(range(100)), column_dtype)234 table = pa.Table.from_arrays([arr], [column])235 spec = {column: dtype}236 with tempfile.NamedTemporaryFile(delete=False) as f:237 with pa.RecordBatchFileWriter(f.name, table.schema) as writer:238 for batch in table.to_batches():239 writer.write_batch(batch)240 def read_table(filename):241 filename = filename.numpy().decode("utf-8")242 reader = pa.RecordBatchFileReader(filename)243 return reader.read_all()244 def from_py_func(filename):245 from tensorflow_io.python.ops.arrow_io_tensor_ops import ArrowIOResource246 table_res = ArrowIOResource.from_py_function(read_table, [filename])247 tio = tfio.IOTensor.from_arrow(table_res, spec=spec)248 return tio(column).to_tensor()249 num_iters = 2250 ds = tf.data.Dataset.from_tensor_slices([f.name, f.name]).map(from_py_func)251 expected = table[column].to_pylist()252 iter_count = 0253 for result in ds:254 npt.assert_array_equal(result, expected)255 iter_count += 1256 self.assertEqual(iter_count, num_iters)257 os.unlink(f.name)258 def test_spec_selection_by_column_name(self):259 """test_spec_selection_by_column_name"""260 def from_func(_):261 a = pa.array([1, 2, 3], type=pa.int32())262 b = pa.array([4, 5, 6], type=pa.int64())263 c = pa.array([7, 8, 9], type=pa.float32())264 t = pa.Table.from_arrays([a, b, c], ["a", "b", "c"])265 foo = tfio.IOTensor.from_arrow(t, spec={"b": tf.int64})266 return foo("b").to_tensor()267 ds = tf.data.Dataset.range(1).map(from_func)268 results = list(ds.as_numpy_iterator())269 self.assertEqual(len(results), 1)270 result = results[0]271 b = pa.array([4, 5, 6], type=pa.int64())272 expected = b.to_numpy()273 npt.assert_array_equal(result, expected)274 def test_spec_selection_by_column_index(self):275 """test_spec_selection_by_column_index"""276 def from_func(_):277 a = pa.array([1, 2, 3], type=pa.int32())278 b = pa.array([4, 5, 6], type=pa.int64())279 c = pa.array([7, 8, 9], type=pa.float32())280 t = pa.Table.from_arrays([a, b, c], ["a", "b", "c"])281 foo = tfio.IOTensor.from_arrow(t, spec={1: tf.int64})282 return foo(1).to_tensor()283 ds = tf.data.Dataset.range(1).map(from_func)284 results = list(ds.as_numpy_iterator())285 self.assertEqual(len(results), 1)286 result = results[0]287 b = pa.array([4, 5, 6], type=pa.int64())288 expected = b.to_numpy()289 npt.assert_array_equal(result, expected)290class ArrowDatasetTest(ArrowTestBase):291 """ArrowDatasetTest"""292 def run_test_case(self, dataset, truth_data, batch_size=None):293 """run_test_case"""294 def is_float(dtype):295 """Check if dtype is a floating-point"""296 return dtype in [tf.dtypes.float16, tf.dtypes.float32, tf.dtypes.float64]297 def evaluate_result(value):298 """Check the results match truth data"""299 for i, col in enumerate(dataset.columns):300 if truth_data.output_shapes[col].ndims == 0:301 if is_float(truth_data.output_types[col]):302 self.assertAlmostEqual(value[i], truth_data.data[col][row], 4)303 else:304 self.assertEqual(value[i], truth_data.data[col][row])305 elif truth_data.output_shapes[col].ndims == 1:306 if is_float(truth_data.output_types[col]):307 for j, v in enumerate(value[i]):308 self.assertAlmostEqual(v, truth_data.data[col][row][j], 4)309 else:310 self.assertListEqual(311 value[i].tolist(), truth_data.data[col][row]312 )313 # Row counter for each single result or batch of multiple rows314 row = 0315 # Iterate over the dataset316 for results in dataset:317 # For batches, iterate over each row in batch or remainder at end318 for result_idx in range(batch_size or 1):319 # Get a single row value320 if batch_size is None:321 value = [r.numpy() for r in results]322 # Get a batch of values and check 1 row at a time323 else:324 if result_idx == 0:325 value_batch = [r.numpy() for r in results]326 # Check for a partial result327 if result_idx == value_batch[0].shape[0]:328 break329 # Get a single row out of the batch330 value = [v[result_idx] for v in value_batch]331 # Check the result then increment the row counter332 evaluate_result(value)333 row += 1334 # Check that all data was returned by Dataset335 self.assertEqual(row, len(truth_data.data[0]))336 def test_arrow_dataset(self):337 """test_arrow_dataset"""338 import tensorflow_io.arrow as arrow_io339 truth_data = TruthData(340 self.scalar_data + self.list_data,341 self.scalar_dtypes + self.list_dtypes,342 self.scalar_shapes + self.list_shapes,343 )344 batch = self.make_record_batch(truth_data)345 # test all columns selected346 dataset = arrow_io.ArrowDataset.from_record_batches(347 batch, truth_data.output_types, truth_data.output_shapes348 )349 self.run_test_case(dataset, truth_data)350 # test column selection351 columns = (1, 3, len(truth_data.output_types) - 1)352 dataset = arrow_io.ArrowDataset.from_record_batches(353 batch,354 tuple(truth_data.output_types[c] for c in columns),355 tuple(truth_data.output_shapes[c] for c in columns),356 columns=columns,357 )358 self.run_test_case(dataset, truth_data)359 # test construction from pd.DataFrame360 df = batch.to_pandas()361 dataset = arrow_io.ArrowDataset.from_pandas(df, preserve_index=False)362 self.run_test_case(dataset, truth_data)363 def test_batched_arrow_dataset_with_strings(self):364 import tensorflow_io.arrow as arrow_io365 scalar_data = [366 [b"1.1", b"2.2", b"3.3", b"4.4"],367 ]368 scalar_dtypes = (tf.string,)369 scalar_shapes = tuple(tf.TensorShape([]) for _ in scalar_dtypes)370 truth_data = TruthData(scalar_data, scalar_dtypes, scalar_shapes)371 array = pa.array(372 scalar_data[0], type=self.get_arrow_type(scalar_dtypes[0], False)373 )374 batch = pa.Table.from_pydict({"array": array}).to_batches(2)375 dataset = arrow_io.ArrowDataset.from_record_batches(376 batch, batch_size=1, output_types=scalar_dtypes, batch_mode="keep_remainder"377 )378 self.run_test_case(dataset, truth_data)379 def test_arrow_dataset_with_strings(self):380 """test_arrow_dataset"""381 import tensorflow_io.arrow as arrow_io382 scalar_data = [383 [b"1.1", b"2.2", b"3.3", b"4.4"],384 ]385 scalar_dtypes = (tf.dtypes.string,)386 scalar_shapes = tuple(tf.TensorShape([]) for _ in scalar_dtypes)387 truth_data = TruthData(scalar_data, scalar_dtypes, scalar_shapes)388 batch = self.make_record_batch(truth_data)389 # test all columns selected390 dataset = arrow_io.ArrowDataset.from_record_batches(391 batch, truth_data.output_types, truth_data.output_shapes392 )393 self.run_test_case(dataset, truth_data)394 def test_from_pandas_preserve_index(self):395 """test_from_pandas_preserve_index"""396 import tensorflow_io.arrow as arrow_io397 data_v = [398 [1.0, 2.0, 3.0],399 [0.2, 0.4, 0.8],400 ]401 truth_data = TruthData(402 data_v,403 (tf.dtypes.float32, tf.dtypes.float32),404 (tf.TensorShape([]), tf.TensorShape([])),405 )406 batch = self.make_record_batch(truth_data)407 df = batch.to_pandas()408 dataset = arrow_io.ArrowDataset.from_pandas(df, preserve_index=True)409 # Add index column to test data to check results410 truth_data_with_index = TruthData(411 truth_data.data + [range(len(truth_data.data[0]))],412 truth_data.output_types + (tf.dtypes.int64,),413 truth_data.output_shapes + (tf.TensorShape([]),),414 )415 self.run_test_case(dataset, truth_data_with_index)416 # Test preserve_index again, selecting second column only417 # NOTE: need to select TruthData because `df` gets selected also418 truth_data_selected_with_index = TruthData(419 truth_data_with_index.data[1:],420 truth_data_with_index.output_types[1:],421 truth_data_with_index.output_shapes[1:],422 )423 dataset = arrow_io.ArrowDataset.from_pandas(424 df, columns=(1,), preserve_index=True425 )426 self.run_test_case(dataset, truth_data_selected_with_index)427 def test_arrow_feather_dataset(self):428 """test_arrow_feather_dataset"""429 import tensorflow_io.arrow as arrow_io430 from pyarrow.feather import write_feather431 # Feather files currently do not support columns of list types432 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)433 batch = self.make_record_batch(truth_data)434 df = batch.to_pandas()435 # Create a tempfile that is deleted after tests run436 with tempfile.NamedTemporaryFile(delete=False) as f:437 write_feather(df, f, version=1)438 # test single file439 dataset = arrow_io.ArrowFeatherDataset(440 f.name,441 list(range(len(truth_data.output_types))),442 truth_data.output_types,443 truth_data.output_shapes,444 )445 self.run_test_case(dataset, truth_data)446 # test single file with 'file://' prefix447 dataset = arrow_io.ArrowFeatherDataset(448 f"file://{f.name}",449 list(range(len(truth_data.output_types))),450 truth_data.output_types,451 truth_data.output_shapes,452 )453 self.run_test_case(dataset, truth_data)454 # test multiple files455 dataset = arrow_io.ArrowFeatherDataset(456 [f.name, f.name],457 list(range(len(truth_data.output_types))),458 truth_data.output_types,459 truth_data.output_shapes,460 )461 truth_data_doubled = TruthData(462 [d * 2 for d in truth_data.data],463 truth_data.output_types,464 truth_data.output_shapes,465 )466 self.run_test_case(dataset, truth_data_doubled)467 # test construction from schema468 dataset = arrow_io.ArrowFeatherDataset.from_schema(f.name, batch.schema)469 self.run_test_case(dataset, truth_data)470 os.unlink(f.name)471 def test_arrow_socket_dataset(self):472 """test_arrow_socket_dataset"""473 import tensorflow_io.arrow as arrow_io474 truth_data = TruthData(475 self.scalar_data + self.list_data,476 self.scalar_dtypes + self.list_dtypes,477 self.scalar_shapes + self.list_shapes,478 )479 batch = self.make_record_batch(truth_data)480 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)481 sock.bind(("127.0.0.1", 0))482 sock.listen(1)483 host_addr, port = sock.getsockname()484 host = f"{host_addr}:{port}"485 def run_server(num_batches):486 conn, _ = sock.accept()487 outfile = conn.makefile(mode="wb")488 writer = pa.RecordBatchStreamWriter(outfile, batch.schema)489 for _ in range(num_batches):490 writer.write_batch(batch)491 writer.close()492 outfile.close()493 conn.close()494 sock.close()495 # test with multiple batches, construct from schema496 num_batches = 2497 server = threading.Thread(target=run_server, args=(num_batches,))498 server.start()499 dataset = arrow_io.ArrowStreamDataset.from_schema(host, batch.schema)500 truth_data_mult = TruthData(501 [d * num_batches for d in truth_data.data],502 truth_data.output_types,503 truth_data.output_shapes,504 )505 self.run_test_case(dataset, truth_data_mult)506 server.join()507 def test_arrow_unix_socket_dataset(self):508 """test_arrow_unix_socket_dataset"""509 import tensorflow_io.arrow as arrow_io510 if os.name == "nt":511 self.skipTest("Unix Domain Sockets not supported on Windows")512 truth_data = TruthData(513 self.scalar_data + self.list_data,514 self.scalar_dtypes + self.list_dtypes,515 self.scalar_shapes + self.list_shapes,516 )517 batch = self.make_record_batch(truth_data)518 host = os.path.join(tempfile.gettempdir(), "arrow_io_stream")519 # Make sure the socket does not already exist520 try:521 os.unlink(host)522 except OSError:523 if os.path.exists(host):524 raise525 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)526 sock.bind(host)527 sock.listen(1)528 def run_server(num_batches):529 conn, _ = sock.accept()530 outfile = conn.makefile(mode="wb")531 writer = pa.RecordBatchStreamWriter(outfile, batch.schema)532 for _ in range(num_batches):533 writer.write_batch(batch)534 writer.close()535 outfile.close()536 conn.close()537 sock.close()538 # test with multiple batches, construct from schema539 num_batches = 2540 server = threading.Thread(target=run_server, args=(num_batches,))541 server.start()542 endpoint = f"unix://{host}"543 dataset = arrow_io.ArrowStreamDataset.from_schema(endpoint, batch.schema)544 truth_data_mult = TruthData(545 [d * num_batches for d in truth_data.data],546 truth_data.output_types,547 truth_data.output_shapes,548 )549 self.run_test_case(dataset, truth_data_mult)550 server.join()551 def test_multiple_stream_hosts(self):552 """test_multiple_stream_hosts"""553 import tensorflow_io.arrow as arrow_io554 if os.name == "nt":555 self.skipTest("Unix Domain Sockets not supported on Windows")556 truth_data = TruthData(557 self.scalar_data + self.list_data,558 self.scalar_dtypes + self.list_dtypes,559 self.scalar_shapes + self.list_shapes,560 )561 batch = self.make_record_batch(truth_data)562 hosts = [563 os.path.join(tempfile.gettempdir(), f"arrow_io_stream_{i}")564 for i in range(1, 3)565 ]566 def start_server(host):567 """start_server"""568 try:569 os.unlink(host)570 except OSError:571 if os.path.exists(host):572 raise573 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)574 sock.bind(host)575 sock.listen(1)576 def run_server(num_batches):577 """run_server"""578 conn, _ = sock.accept()579 outfile = conn.makefile(mode="wb")580 writer = pa.RecordBatchStreamWriter(outfile, batch.schema)581 for _ in range(num_batches):582 writer.write_batch(batch)583 writer.close()584 outfile.close()585 conn.close()586 sock.close()587 # test with multiple batches, construct from schema588 server = threading.Thread(target=run_server, args=(1,))589 server.start()590 return server591 servers = [start_server(h) for h in hosts]592 endpoints = [f"unix://{h}" for h in hosts]593 dataset = arrow_io.ArrowStreamDataset.from_schema(endpoints, batch.schema)594 truth_data_mult = TruthData(595 [d * len(hosts) for d in truth_data.data],596 truth_data.output_types,597 truth_data.output_shapes,598 )599 self.run_test_case(dataset, truth_data_mult)600 for s in servers:601 s.join()602 def test_stream_from_pandas(self):603 """test_stream_from_pandas"""604 import tensorflow_io.arrow as arrow_io605 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)606 batch = self.make_record_batch(truth_data)607 df = batch.to_pandas()608 batch_size = 2609 # Test preserve index False610 dataset = arrow_io.ArrowStreamDataset.from_pandas(611 df, batch_size=batch_size, preserve_index=False612 )613 self.run_test_case(dataset, truth_data, batch_size=batch_size)614 # Test preserve index True and select all but index columns615 truth_data = TruthData(616 truth_data.data + [range(len(truth_data.data[0]))],617 truth_data.output_types + (tf.dtypes.int64,),618 truth_data.output_shapes + (tf.TensorShape([]),),619 )620 dataset = arrow_io.ArrowStreamDataset.from_pandas(621 df, batch_size=batch_size, preserve_index=True622 )623 self.run_test_case(dataset, truth_data, batch_size=batch_size)624 def test_stream_from_pandas_remainder(self):625 """Test stream from Pandas that produces partial batch"""626 import tensorflow_io.arrow as arrow_io627 batch_size = len(self.scalar_data[0]) - 1628 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)629 batch = self.make_record_batch(truth_data)630 df = batch.to_pandas()631 dataset = arrow_io.ArrowStreamDataset.from_pandas(632 df, batch_size=batch_size, preserve_index=False633 )634 self.run_test_case(dataset, truth_data, batch_size=batch_size)635 def test_stream_from_pandas_iter(self):636 """test_stream_from_pandas_iter"""637 import tensorflow_io.arrow as arrow_io638 batch_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)639 batch = self.make_record_batch(batch_data)640 df = batch.to_pandas()641 batch_size = 2642 num_iters = 3643 dataset = arrow_io.ArrowStreamDataset.from_pandas(644 (df for _ in range(num_iters)), batch_size=batch_size, preserve_index=False645 )646 truth_data = TruthData(647 [d * num_iters for d in batch_data.data],648 batch_data.output_types,649 batch_data.output_shapes,650 )651 self.run_test_case(dataset, truth_data, batch_size=batch_size)652 def test_stream_from_pandas_not_batched(self):653 """test_stream_from_pandas_not_batched"""654 import tensorflow_io.arrow as arrow_io655 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)656 batch = self.make_record_batch(truth_data)657 df = batch.to_pandas()658 dataset = arrow_io.ArrowStreamDataset.from_pandas(df, preserve_index=False)659 self.run_test_case(dataset, truth_data)660 def test_stream_from_pandas_repeat(self):661 """test_stream_from_pandas_repeat"""662 import tensorflow_io.arrow as arrow_io663 batch_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)664 batch = self.make_record_batch(batch_data)665 df = batch.to_pandas()666 num_repeat = 10667 dataset = arrow_io.ArrowStreamDataset.from_pandas(668 df, batch_size=2, preserve_index=False669 ).repeat(num_repeat)670 # patch columns attr so run_test_case can use671 dataset.columns = list(range(len(batch_data.output_types)))672 truth_data = TruthData(673 [d * num_repeat for d in batch_data.data],674 batch_data.output_types,675 batch_data.output_shapes,676 )677 self.run_test_case(dataset, truth_data, batch_size=2)678 def test_bool_array_type(self):679 """NOTE: need to test this separately because to_pandas fails with680 ArrowNotImplementedError: Not implemented type for list in681 DataFrameBlock: bool682 see https://issues.apache.org/jira/browse/ARROW-4370683 """684 import tensorflow_io.arrow as arrow_io685 truth_data = TruthData(686 [[[False, False], [False, True], [True, False], [True, True]]],687 (tf.dtypes.bool,),688 (tf.TensorShape([None]),),689 )690 batch = self.make_record_batch(truth_data)691 dataset = arrow_io.ArrowDataset.from_record_batches(692 batch, truth_data.output_types, truth_data.output_shapes, columns=(0,)693 )694 self.run_test_case(dataset, truth_data)695 def test_incorrect_column_type(self):696 """Test that a column with incorrect dtype raises error"""697 import tensorflow_io.arrow as arrow_io698 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)699 batch = self.make_record_batch(truth_data)700 dataset = arrow_io.ArrowDataset.from_record_batches(701 batch,702 tuple(tf.dtypes.int32 for _ in truth_data.output_types),703 truth_data.output_shapes,704 )705 with self.assertRaisesRegex(tf.errors.OpError, "Arrow type mismatch"):706 self.run_test_case(dataset, truth_data)707 def test_map_and_batch(self):708 """Test that using map then batch produces correct output. This will create709 a map_and_batch_dataset_op that calls GetNext after end_of_sequence=true710 """711 import tensorflow_io.arrow as arrow_io712 truth_data = TruthData(713 [list(range(10))], (tf.dtypes.int32,), (tf.TensorShape([]),)714 )715 batch = self.make_record_batch(truth_data)716 dataset = arrow_io.ArrowDataset.from_record_batches(717 batch, truth_data.output_types, truth_data.output_shapes718 )719 dataset = dataset.map(lambda x: x).batch(4)720 expected = truth_data.data[0]721 for result_tensors in dataset:722 results = result_tensors.numpy()723 for x in results:724 self.assertTrue(expected, "Dataset has more output than expected")725 self.assertEqual(x, expected[0])726 expected.pop(0)727 @pytest.mark.skip(reason="TODO")728 def test_tf_function(self):729 """Test that an ArrowDataset can be used in tf.function call"""730 import tensorflow_io.arrow as arrow_io731 if not tf.version.VERSION.startswith("2."):732 self.skipTest("Test requires TF2.0 for tf.function")733 truth_data = TruthData(734 [list(range(10)), [x * 1.1 for x in range(10)]],735 (tf.dtypes.int32, tf.dtypes.float64),736 (tf.TensorShape([]), tf.TensorShape([])),737 )738 @tf.function739 def create_arrow_dataset(serialized_batch):740 """Create an arrow dataset from input tensor"""741 dataset = arrow_io.ArrowDataset(742 serialized_batch,743 list(range(len(truth_data.output_types))),744 truth_data.output_types,745 truth_data.output_shapes,746 )747 return dataset748 batch = self.make_record_batch(truth_data)749 buf = io.BytesIO()750 writer = pa.RecordBatchFileWriter(buf, batch.schema)751 writer.write_batch(batch)752 writer.close()753 for row, results in enumerate(create_arrow_dataset(buf.getvalue())):754 value = [result.numpy() for result in results]755 self.assertEqual(value[0], truth_data.data[0][row])756 self.assertAlmostEqual(value[1], truth_data.data[1][row], 4)757 def test_batch_no_remainder(self):758 """Test batch_size that does not leave a remainder"""759 import tensorflow_io.arrow as arrow_io760 batch_size = len(self.scalar_data[0])761 num_batches = 2762 truth_data = TruthData(763 [d * num_batches for d in self.scalar_data],764 self.scalar_dtypes,765 self.scalar_shapes,766 )767 batch = self.make_record_batch(truth_data)768 df = batch.to_pandas()769 dataset = arrow_io.ArrowDataset.from_pandas(770 df, preserve_index=False, batch_size=batch_size771 )772 self.run_test_case(dataset, truth_data, batch_size=batch_size)773 def test_batch_remainder(self):774 """Test batch_size that does leave a remainder"""775 import tensorflow_io.arrow as arrow_io776 batch_size = len(self.scalar_data[0]) - 1777 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)778 batch = self.make_record_batch(truth_data)779 df = batch.to_pandas()780 dataset = arrow_io.ArrowDataset.from_pandas(781 df, preserve_index=False, batch_size=batch_size782 )783 self.run_test_case(dataset, truth_data, batch_size=batch_size)784 def test_batch_drop_remainder(self):785 """Test batch_size that drops remainder data"""786 import tensorflow_io.arrow as arrow_io787 batch_size = len(self.scalar_data[0]) - 1788 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)789 batch = self.make_record_batch(truth_data)790 df = batch.to_pandas()791 truth_data_drop_last = TruthData(792 [d[:-1] for d in truth_data.data],793 truth_data.output_types,794 truth_data.output_shapes,795 )796 dataset = arrow_io.ArrowDataset.from_pandas(797 df, preserve_index=False, batch_size=batch_size, batch_mode="drop_remainder"798 )799 self.run_test_case(dataset, truth_data_drop_last, batch_size=batch_size)800 def test_batch_mode_auto(self):801 """Test auto batch_mode to size to record batch number of rows"""802 import tensorflow_io.arrow as arrow_io803 num_batches = 2804 single_batch_data = TruthData(805 self.scalar_data, self.scalar_dtypes, self.scalar_shapes806 )807 batch = self.make_record_batch(single_batch_data)808 batches = [batch] * num_batches809 truth_data = TruthData(810 [d * num_batches for d in single_batch_data.data],811 single_batch_data.output_types,812 single_batch_data.output_shapes,813 )814 dataset = arrow_io.ArrowDataset.from_record_batches(815 batches,816 truth_data.output_types,817 truth_data.output_shapes,818 batch_mode="auto",819 )820 self.run_test_case(dataset, truth_data, batch_size=batch.num_rows)821 def test_batch_with_partials(self):822 """Test batch_size that divides an Arrow record batch into823 partial batches824 """825 import tensorflow_io.arrow as arrow_io826 num_batches = 3827 batch_size = int(len(self.scalar_data[0]) * 1.5)828 single_batch_data = TruthData(829 self.scalar_data, self.scalar_dtypes, self.scalar_shapes830 )831 batch = self.make_record_batch(single_batch_data)832 batches = [batch] * num_batches833 truth_data = TruthData(834 [d * num_batches for d in single_batch_data.data],835 single_batch_data.output_types,836 single_batch_data.output_shapes,837 )838 # Batches should divide input without remainder839 self.assertEqual(len(truth_data.data[0]) % batch_size, 0)840 dataset = arrow_io.ArrowDataset.from_record_batches(841 batches,842 truth_data.output_types,843 truth_data.output_shapes,844 batch_size=batch_size,845 )846 self.run_test_case(dataset, truth_data, batch_size=batch_size)847 def test_batch_with_partials_and_remainder(self):848 """Test batch_size that divides an Arrow record batch into849 partial batches and leaves remainder data850 """851 import tensorflow_io.arrow as arrow_io852 num_batches = 3853 batch_size = len(self.scalar_data[0]) + 1854 single_batch_data = TruthData(855 self.scalar_data, self.scalar_dtypes, self.scalar_shapes856 )857 batch = self.make_record_batch(single_batch_data)858 batches = [batch] * num_batches859 truth_data = TruthData(860 [d * num_batches for d in single_batch_data.data],861 single_batch_data.output_types,862 single_batch_data.output_shapes,863 )864 # Batches should divide input and leave a remainder865 self.assertNotEqual(len(truth_data.data[0]) % batch_size, 0)866 dataset = arrow_io.ArrowDataset.from_record_batches(867 batches,868 truth_data.output_types,869 truth_data.output_shapes,870 batch_size=batch_size,871 )872 self.run_test_case(dataset, truth_data, batch_size=batch_size)873 def test_batch_spans_mulitple_partials(self):874 """Test large batch_size that spans mulitple Arrow record batches"""875 import tensorflow_io.arrow as arrow_io876 num_batches = 6877 batch_size = int(len(self.scalar_data[0]) * 3)878 single_batch_data = TruthData(879 self.scalar_data, self.scalar_dtypes, self.scalar_shapes880 )881 batch = self.make_record_batch(single_batch_data)882 batches = [batch] * num_batches883 truth_data = TruthData(884 [d * num_batches for d in single_batch_data.data],885 single_batch_data.output_types,886 single_batch_data.output_shapes,887 )888 dataset = arrow_io.ArrowDataset.from_record_batches(889 batches,890 truth_data.output_types,891 truth_data.output_shapes,892 batch_size=batch_size,893 )894 self.run_test_case(dataset, truth_data, batch_size=batch_size)895 def test_batch_fixed_lists(self):896 """Test batching with fixed length list types"""897 import tensorflow_io.arrow as arrow_io898 batch_size = int(len(self.list_fixed_data[0]) / 2)899 truth_data = TruthData(900 self.list_fixed_data, self.list_fixed_dtypes, self.list_fixed_shapes901 )902 batch = self.make_record_batch(truth_data)903 dataset = arrow_io.ArrowDataset.from_record_batches(904 [batch],905 truth_data.output_types,906 truth_data.output_shapes,907 batch_size=batch_size,908 )909 self.run_test_case(dataset, truth_data, batch_size=batch_size)910 def test_batch_variable_length_list_batched(self):911 """Test batching with variable length lists raises error"""912 import tensorflow_io.arrow as arrow_io913 batch_size = len(self.list_var_data[1])914 truth_data = TruthData(915 self.list_var_data, self.list_var_dtypes, self.list_var_shapes916 )917 batch = self.make_record_batch(truth_data)918 dataset = arrow_io.ArrowDataset.from_record_batches(919 [batch],920 truth_data.output_types,921 truth_data.output_shapes,922 batch_size=batch_size,923 )924 with self.assertRaisesRegex(tf.errors.OpError, "variable.*unsupported"):925 self.run_test_case(dataset, truth_data, batch_size=batch_size)926 def test_batch_variable_length_list_unbatched(self):927 """Test unbatched variable length lists"""928 import tensorflow_io.arrow as arrow_io929 batch_size = None930 truth_data = TruthData(931 self.list_var_data, self.list_var_dtypes, self.list_var_shapes932 )933 batch = self.make_record_batch(truth_data)934 dataset = arrow_io.ArrowDataset.from_record_batches(935 [batch],936 truth_data.output_types,937 truth_data.output_shapes,938 batch_size=batch_size,939 )940 self.run_test_case(dataset, truth_data, batch_size=batch_size)941 def test_unsupported_batch_mode(self):942 """Test using an unsupported batch mode"""943 import tensorflow_io.arrow as arrow_io944 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)945 with self.assertRaisesRegex(ValueError, "Unsupported batch_mode.*doh"):946 arrow_io.ArrowDataset.from_record_batches(947 [self.make_record_batch(truth_data)],948 truth_data.output_types,949 truth_data.output_shapes,950 batch_mode="doh",951 )952 def test_arrow_list_feather_columns(self):953 """test_arrow_list_feather_columns"""954 import tensorflow_io.arrow as arrow_io955 from pyarrow.feather import write_feather956 # Feather files currently do not support columns of list types957 truth_data = TruthData(self.scalar_data, self.scalar_dtypes, self.scalar_shapes)958 batch = self.make_record_batch(truth_data)959 df = batch.to_pandas()960 # Create a tempfile that is deleted after tests run961 with tempfile.NamedTemporaryFile(delete=False) as f:962 write_feather(df, f, version=1)963 # test single file964 # prefix "file://" to test scheme file system (e.g., s3, gcs, azfs, ignite)965 columns = arrow_io.list_feather_columns("file://" + f.name)966 for name, dtype in list(zip(batch.schema.names, batch.schema.types)):967 assert columns[name].name == name968 assert columns[name].dtype == dtype969 assert columns[name].shape == [4]970 # test memory971 with open(f.name, "rb") as ff:972 memory = ff.read()973 # when memory is provided filename doesn't matter:974 columns = arrow_io.list_feather_columns("file:///non_exist", memory=memory)975 for name, dtype in list(zip(batch.schema.names, batch.schema.types)):976 assert columns[name].name == name977 assert columns[name].dtype == dtype978 assert columns[name].shape == [4]979 os.unlink(f.name)980if __name__ == "__main__":...
test_tools.py
Source:test_tools.py
...83def _minimum_is_zero(array: np.ndarray) -> bool:84 return np.min(array) == 085def _maximum_is_one(array: np.ndarray) -> bool:86 return np.max(array) == 187@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_dimension()))88def test_complex_2_real_imag_vector_len(array):89 assert_equal(len(complex_2_real_imag(array)), len(array) * 2)90@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_dimension()))91def test_complex_2_real_imag_vector_equality_real(array):92 array_real_imag = complex_2_real_imag(array)93 assert_equal(array_real_imag[: len(array_real_imag) // 2], np.real(array))94@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_dimension()))95def test_complex_2_real_imag_vector_equality_imag(array):96 array_real_imag = complex_2_real_imag(array)97 assert_equal(array_real_imag[len(array_real_imag) // 2 :], np.imag(array))98@given(99 hnp.arrays(100 dtype=hnp.scalar_dtypes(), shape=hypothesis_two_dimensional_array_shape()101 )102)103def test_complex_2_real_imag_array_len(array):104 array_real_imag = complex_2_real_imag(array)105 assert_equal(len(array_real_imag), len(array))106 assert_equal(array_real_imag.shape[1], array.shape[1] * 2)107@given(108 hnp.arrays(109 dtype=hnp.scalar_dtypes(), shape=hypothesis_two_dimensional_array_shape()110 )111)112def test_complex_2_real_imag_array_equality_real(array):113 array_real_imag = complex_2_real_imag(array)114 assert_equal(array_real_imag[:, : len(array_real_imag[0]) // 2], np.real(array))115@given(116 hnp.arrays(117 dtype=hnp.scalar_dtypes(), shape=hypothesis_two_dimensional_array_shape()118 )119)120def test_complex_2_real_imag_array_equality_imag(array):121 array_real_imag = complex_2_real_imag(array)122 assert_equal(array_real_imag[:, len(array_real_imag[0]) // 2 :], np.imag(array))123@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_odd_dimension()))124def test_separate_real_imag_of_vector_wrong_len(array):125 with pytest.raises(126 ValueError,127 match="separate_real_imag_of_vector: vector of real and imaginary parts is "128 "expected to contain exactly as many real as imaginary parts but is of "129 r"odd length=.*",130 ):131 separate_real_imag_of_vector(array)132@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_even_dimension()))133def test_separate_real_imag_of_vector_dimensions(vector):134 list_of_separated_real_imag = separate_real_imag_of_vector(vector)135 assert_equal(len(list_of_separated_real_imag), 2)136 assert_equal(137 _set_of_lens_of_list_entries(list_of_separated_real_imag),138 {len(vector) / 2},139 )140def _set_of_lens_of_list_entries(list_of_anything: List) -> Set[int]:141 return set(len(list_entry) for list_entry in list_of_anything)142@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_even_dimension()))143def test_separate_real_imag_of_vector_first_half(vector):144 assert_equal(separate_real_imag_of_vector(vector)[0], vector[: len(vector) // 2])145@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_even_dimension()))146def test_separate_real_imag_of_vector_second_half(vector):147 assert_equal(separate_real_imag_of_vector(vector)[1], vector[len(vector) // 2 :])148@given(149 hnp.arrays(150 dtype=hnp.scalar_dtypes(),151 shape=hypothesis_two_dimensional_array_shape(ensure_odd_second_dimension=True),152 )153)154def test_separate_real_imag_of_mc_samples_wrong_len(array):155 with pytest.raises(156 ValueError,157 match="separate_real_imag_of_mc_samples: vectors of real and imaginary "158 "parts are expected to contain exactly as many real as "159 r"imaginary parts but the first one is of odd length=.*",160 ):161 separate_real_imag_of_mc_samples(array)162@given(163 hnp.arrays(164 dtype=hnp.scalar_dtypes(),165 shape=hypothesis_two_dimensional_array_shape(ensure_even_second_dimension=True),166 )167)168def test_separate_real_imag_of_mc_samples_dimensions(array):169 list_of_separated_real_imag = separate_real_imag_of_mc_samples(array)170 assert_equal(len(list_of_separated_real_imag), 2)171 assert_equal(172 _set_of_shapes_of_ndarray_list(list_of_separated_real_imag),173 {(len(array), len(array[0]) / 2)},174 )175def _set_of_shapes_of_ndarray_list(176 list_of_anything: List[np.ndarray],177) -> Set[Tuple[int, int]]:178 return set(list_entry.shape for list_entry in list_of_anything)179@given(180 hnp.arrays(181 dtype=hnp.scalar_dtypes(),182 shape=hypothesis_two_dimensional_array_shape(ensure_even_second_dimension=True),183 )184)185def test_separate_real_imag_of_mc_samples_first_half(array):186 assert_equal(187 separate_real_imag_of_mc_samples(array)[0], array[:, : len(array[0]) // 2]188 )189@given(190 hnp.arrays(191 dtype=hnp.scalar_dtypes(),192 shape=hypothesis_two_dimensional_array_shape(ensure_even_second_dimension=True),193 )194)195def test_separate_real_imag_of_mc_samples_second_half(array):196 assert_equal(197 separate_real_imag_of_mc_samples(array)[1], array[:, len(array[0]) // 2 :]198 )199@given(200 hnp.arrays(201 dtype=hst.one_of(202 hnp.unsigned_integer_dtypes(), hnp.integer_dtypes(), hnp.floating_dtypes()203 ),204 shape=hypothesis_two_dimensional_array_shape(ensure_even_second_dimension=True),205 )206)207def test_real_imag_2_complex_array_shape(array):208 assert_equal(real_imag_2_complex(array).shape, (len(array), len(array[0]) // 2))209@given(210 hnp.arrays(211 dtype=hst.one_of(212 hnp.unsigned_integer_dtypes(), hnp.integer_dtypes(), hnp.floating_dtypes()213 ),214 shape=hypothesis_even_dimension(),215 )216)217def test_real_imag_2_complex_vector_len(array):218 assert_equal(len(real_imag_2_complex(array)), len(array) // 2)219@given(220 hnp.arrays(221 dtype=hst.one_of(222 hnp.unsigned_integer_dtypes(), hnp.integer_dtypes(), hnp.floating_dtypes()223 ),224 shape=hypothesis_two_dimensional_array_shape(ensure_even_second_dimension=True),225 )226)227def test_real_imag_2_complex_array_values(array):228 half_the_array_length = len(array[0]) // 2229 assert_equal(230 real_imag_2_complex(array),231 array[:, :half_the_array_length] + 1j * array[:, half_the_array_length:],232 )233@given(234 hnp.arrays(235 dtype=hst.one_of(236 hnp.unsigned_integer_dtypes(), hnp.integer_dtypes(), hnp.floating_dtypes()237 ),238 shape=hypothesis_even_dimension(),239 )240)241def test_real_imag_2_complex_vector_values(array):242 half_the_array_length = len(array) // 2243 assert_equal(244 real_imag_2_complex(array),245 array[:half_the_array_length] + 1j * array[half_the_array_length:],246 )247@given(hnp.arrays(dtype=hnp.scalar_dtypes(), shape=hypothesis_odd_dimension()))248def test_real_imag_2_complex_vector_wrong_len(vector):249 with pytest.raises(250 ValueError,251 match="separate_real_imag_of_vector: vector of real and imaginary parts is "252 "expected to contain exactly as many real as imaginary parts but is of "253 r"odd length=.*",254 ):255 real_imag_2_complex(vector)256@given(257 hnp.arrays(258 dtype=hnp.scalar_dtypes(),259 shape=hypothesis_two_dimensional_array_shape(ensure_odd_second_dimension=True),260 )261)262def test_real_imag_2_complex_array_wrong_len(array):263 with pytest.raises(264 ValueError,265 match="separate_real_imag_of_mc_samples: vectors of real and imaginary "266 "parts are expected to contain exactly as many real as "267 r"imaginary parts but the first one is of odd length=.*",268 ):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!