Best Python code snippet using fMBT_python
test_to_csv.py
Source:test_to_csv.py
...20MIXED_FLOAT_DTYPES = ['float16', 'float32', 'float64']21MIXED_INT_DTYPES = ['uint8', 'uint16', 'uint32', 'uint64', 'int8', 'int16',22 'int32', 'int64']23class TestDataFrameToCSV(TestData):24 def read_csv(self, path, **kwargs):25 params = dict(index_col=0, parse_dates=True)26 params.update(**kwargs)27 return pd.read_csv(path, **params)28 def test_from_csv_deprecation(self):29 # see gh-1781230 with ensure_clean('__tmp_from_csv_deprecation__') as path:31 self.tsframe.to_csv(path)32 with tm.assert_produces_warning(FutureWarning):33 depr_recons = DataFrame.from_csv(path)34 assert_frame_equal(self.tsframe, depr_recons)35 def test_to_csv_from_csv1(self):36 with ensure_clean('__tmp_to_csv_from_csv1__') as path:37 self.frame['A'][:5] = np.nan38 self.frame.to_csv(path)39 self.frame.to_csv(path, columns=['A', 'B'])40 self.frame.to_csv(path, header=False)41 self.frame.to_csv(path, index=False)42 # test roundtrip43 self.tsframe.to_csv(path)44 recons = self.read_csv(path)45 assert_frame_equal(self.tsframe, recons)46 self.tsframe.to_csv(path, index_label='index')47 recons = self.read_csv(path, index_col=None)48 assert(len(recons.columns) == len(self.tsframe.columns) + 1)49 # no index50 self.tsframe.to_csv(path, index=False)51 recons = self.read_csv(path, index_col=None)52 assert_almost_equal(self.tsframe.values, recons.values)53 # corner case54 dm = DataFrame({'s1': Series(lrange(3), lrange(3)),55 's2': Series(lrange(2), lrange(2))})56 dm.to_csv(path)57 recons = self.read_csv(path)58 assert_frame_equal(dm, recons)59 def test_to_csv_from_csv2(self):60 with ensure_clean('__tmp_to_csv_from_csv2__') as path:61 # duplicate index62 df = DataFrame(np.random.randn(3, 3), index=['a', 'a', 'b'],63 columns=['x', 'y', 'z'])64 df.to_csv(path)65 result = self.read_csv(path)66 assert_frame_equal(result, df)67 midx = MultiIndex.from_tuples(68 [('A', 1, 2), ('A', 1, 2), ('B', 1, 2)])69 df = DataFrame(np.random.randn(3, 3), index=midx,70 columns=['x', 'y', 'z'])71 df.to_csv(path)72 result = self.read_csv(path, index_col=[0, 1, 2],73 parse_dates=False)74 assert_frame_equal(result, df, check_names=False)75 # column aliases76 col_aliases = Index(['AA', 'X', 'Y', 'Z'])77 self.frame2.to_csv(path, header=col_aliases)78 rs = self.read_csv(path)79 xp = self.frame2.copy()80 xp.columns = col_aliases81 assert_frame_equal(xp, rs)82 pytest.raises(ValueError, self.frame2.to_csv, path,83 header=['AA', 'X'])84 def test_to_csv_from_csv3(self):85 with ensure_clean('__tmp_to_csv_from_csv3__') as path:86 df1 = DataFrame(np.random.randn(3, 1))87 df2 = DataFrame(np.random.randn(3, 1))88 df1.to_csv(path)89 df2.to_csv(path, mode='a', header=False)90 xp = pd.concat([df1, df2])91 rs = pd.read_csv(path, index_col=0)92 rs.columns = lmap(int, rs.columns)93 xp.columns = lmap(int, xp.columns)94 assert_frame_equal(xp, rs)95 def test_to_csv_from_csv4(self):96 with ensure_clean('__tmp_to_csv_from_csv4__') as path:97 # GH 10833 (TimedeltaIndex formatting)98 dt = pd.Timedelta(seconds=1)99 df = pd.DataFrame({'dt_data': [i * dt for i in range(3)]},100 index=pd.Index([i * dt for i in range(3)],101 name='dt_index'))102 df.to_csv(path)103 result = pd.read_csv(path, index_col='dt_index')104 result.index = pd.to_timedelta(result.index)105 # TODO: remove renaming when GH 10875 is solved106 result.index = result.index.rename('dt_index')107 result['dt_data'] = pd.to_timedelta(result['dt_data'])108 assert_frame_equal(df, result, check_index_type=True)109 def test_to_csv_from_csv5(self):110 # tz, 8260111 with ensure_clean('__tmp_to_csv_from_csv5__') as path:112 self.tzframe.to_csv(path)113 result = pd.read_csv(path, index_col=0, parse_dates=['A'])114 converter = lambda c: to_datetime(result[c]).dt.tz_convert(115 'UTC').dt.tz_convert(self.tzframe[c].dt.tz)116 result['B'] = converter('B')117 result['C'] = converter('C')118 assert_frame_equal(result, self.tzframe)119 def test_to_csv_cols_reordering(self):120 # GH3454121 import pandas as pd122 chunksize = 5123 N = int(chunksize * 2.5)124 df = mkdf(N, 3)125 cs = df.columns126 cols = [cs[2], cs[0]]127 with ensure_clean() as path:128 df.to_csv(path, columns=cols, chunksize=chunksize)129 rs_c = pd.read_csv(path, index_col=0)130 assert_frame_equal(df[cols], rs_c, check_names=False)131 def test_to_csv_new_dupe_cols(self):132 import pandas as pd133 def _check_df(df, cols=None):134 with ensure_clean() as path:135 df.to_csv(path, columns=cols, chunksize=chunksize)136 rs_c = pd.read_csv(path, index_col=0)137 # we wrote them in a different order138 # so compare them in that order139 if cols is not None:140 if df.columns.is_unique:141 rs_c.columns = cols142 else:143 indexer, missing = df.columns.get_indexer_non_unique(144 cols)145 rs_c.columns = df.columns.take(indexer)146 for c in cols:147 obj_df = df[c]148 obj_rs = rs_c[c]149 if isinstance(obj_df, Series):150 assert_series_equal(obj_df, obj_rs)151 else:152 assert_frame_equal(153 obj_df, obj_rs, check_names=False)154 # wrote in the same order155 else:156 rs_c.columns = df.columns157 assert_frame_equal(df, rs_c, check_names=False)158 chunksize = 5159 N = int(chunksize * 2.5)160 # dupe cols161 df = mkdf(N, 3)162 df.columns = ['a', 'a', 'b']163 _check_df(df, None)164 # dupe cols with selection165 cols = ['b', 'a']166 _check_df(df, cols)167 @pytest.mark.slow168 def test_to_csv_dtnat(self):169 # GH3437170 from pandas import NaT171 def make_dtnat_arr(n, nnat=None):172 if nnat is None:173 nnat = int(n * 0.1) # 10%174 s = list(date_range('2000', freq='5min', periods=n))175 if nnat:176 for i in np.random.randint(0, len(s), nnat):177 s[i] = NaT178 i = np.random.randint(100)179 s[-i] = NaT180 s[i] = NaT181 return s182 chunksize = 1000183 # N=35000184 s1 = make_dtnat_arr(chunksize + 5)185 s2 = make_dtnat_arr(chunksize + 5, 0)186 # s3=make_dtnjat_arr(chunksize+5,0)187 with ensure_clean('1.csv') as pth:188 df = DataFrame(dict(a=s1, b=s2))189 df.to_csv(pth, chunksize=chunksize)190 recons = self.read_csv(pth)._convert(datetime=True,191 coerce=True)192 assert_frame_equal(df, recons, check_names=False,193 check_less_precise=True)194 @pytest.mark.slow195 def test_to_csv_moar(self):196 def _do_test(df, r_dtype=None, c_dtype=None,197 rnlvl=None, cnlvl=None, dupe_col=False):198 kwargs = dict(parse_dates=False)199 if cnlvl:200 if rnlvl is not None:201 kwargs['index_col'] = lrange(rnlvl)202 kwargs['header'] = lrange(cnlvl)203 with ensure_clean('__tmp_to_csv_moar__') as path:204 df.to_csv(path, encoding='utf8',205 chunksize=chunksize)206 recons = self.read_csv(path, **kwargs)207 else:208 kwargs['header'] = 0209 with ensure_clean('__tmp_to_csv_moar__') as path:210 df.to_csv(path, encoding='utf8', chunksize=chunksize)211 recons = self.read_csv(path, **kwargs)212 def _to_uni(x):213 if not isinstance(x, compat.text_type):214 return x.decode('utf8')215 return x216 if dupe_col:217 # read_Csv disambiguates the columns by218 # labeling them dupe.1,dupe.2, etc'. monkey patch columns219 recons.columns = df.columns220 if rnlvl and not cnlvl:221 delta_lvl = [recons.iloc[222 :, i].values for i in range(rnlvl - 1)]223 ix = MultiIndex.from_arrays([list(recons.index)] + delta_lvl)224 recons.index = ix225 recons = recons.iloc[:, rnlvl - 1:]226 type_map = dict(i='i', f='f', s='O', u='O', dt='O', p='O')227 if r_dtype:228 if r_dtype == 'u': # unicode229 r_dtype = 'O'230 recons.index = np.array(lmap(_to_uni, recons.index),231 dtype=r_dtype)232 df.index = np.array(lmap(_to_uni, df.index), dtype=r_dtype)233 elif r_dtype == 'dt': # unicode234 r_dtype = 'O'235 recons.index = np.array(lmap(Timestamp, recons.index),236 dtype=r_dtype)237 df.index = np.array(238 lmap(Timestamp, df.index), dtype=r_dtype)239 elif r_dtype == 'p':240 r_dtype = 'O'241 recons.index = np.array(242 list(map(Timestamp, to_datetime(recons.index))),243 dtype=r_dtype)244 df.index = np.array(245 list(map(Timestamp, df.index.to_timestamp())),246 dtype=r_dtype)247 else:248 r_dtype = type_map.get(r_dtype)249 recons.index = np.array(recons.index, dtype=r_dtype)250 df.index = np.array(df.index, dtype=r_dtype)251 if c_dtype:252 if c_dtype == 'u':253 c_dtype = 'O'254 recons.columns = np.array(lmap(_to_uni, recons.columns),255 dtype=c_dtype)256 df.columns = np.array(257 lmap(_to_uni, df.columns), dtype=c_dtype)258 elif c_dtype == 'dt':259 c_dtype = 'O'260 recons.columns = np.array(lmap(Timestamp, recons.columns),261 dtype=c_dtype)262 df.columns = np.array(263 lmap(Timestamp, df.columns), dtype=c_dtype)264 elif c_dtype == 'p':265 c_dtype = 'O'266 recons.columns = np.array(267 lmap(Timestamp, to_datetime(recons.columns)),268 dtype=c_dtype)269 df.columns = np.array(270 lmap(Timestamp, df.columns.to_timestamp()),271 dtype=c_dtype)272 else:273 c_dtype = type_map.get(c_dtype)274 recons.columns = np.array(recons.columns, dtype=c_dtype)275 df.columns = np.array(df.columns, dtype=c_dtype)276 assert_frame_equal(df, recons, check_names=False,277 check_less_precise=True)278 N = 100279 chunksize = 1000280 for ncols in [4]:281 base = int((chunksize // ncols or 1) or 1)282 for nrows in [2, 10, N - 1, N, N + 1, N + 2, 2 * N - 2,283 2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,284 base - 1, base, base + 1]:285 _do_test(mkdf(nrows, ncols, r_idx_type='dt',286 c_idx_type='s'), 'dt', 's')287 for ncols in [4]:288 base = int((chunksize // ncols or 1) or 1)289 for nrows in [2, 10, N - 1, N, N + 1, N + 2, 2 * N - 2,290 2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,291 base - 1, base, base + 1]:292 _do_test(mkdf(nrows, ncols, r_idx_type='dt',293 c_idx_type='s'), 'dt', 's')294 pass295 for r_idx_type, c_idx_type in [('i', 'i'), ('s', 's'), ('u', 'dt'),296 ('p', 'p')]:297 for ncols in [1, 2, 3, 4]:298 base = int((chunksize // ncols or 1) or 1)299 for nrows in [2, 10, N - 1, N, N + 1, N + 2, 2 * N - 2,300 2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,301 base - 1, base, base + 1]:302 _do_test(mkdf(nrows, ncols, r_idx_type=r_idx_type,303 c_idx_type=c_idx_type),304 r_idx_type, c_idx_type)305 for ncols in [1, 2, 3, 4]:306 base = int((chunksize // ncols or 1) or 1)307 for nrows in [10, N - 2, N - 1, N, N + 1, N + 2, 2 * N - 2,308 2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,309 base - 1, base, base + 1]:310 _do_test(mkdf(nrows, ncols))311 for nrows in [10, N - 2, N - 1, N, N + 1, N + 2]:312 df = mkdf(nrows, 3)313 cols = list(df.columns)314 cols[:2] = ["dupe", "dupe"]315 cols[-2:] = ["dupe", "dupe"]316 ix = list(df.index)317 ix[:2] = ["rdupe", "rdupe"]318 ix[-2:] = ["rdupe", "rdupe"]319 df.index = ix320 df.columns = cols321 _do_test(df, dupe_col=True)322 _do_test(DataFrame(index=lrange(10)))323 _do_test(mkdf(chunksize // 2 + 1, 2, r_idx_nlevels=2), rnlvl=2)324 for ncols in [2, 3, 4]:325 base = int(chunksize // ncols)326 for nrows in [10, N - 2, N - 1, N, N + 1, N + 2, 2 * N - 2,327 2 * N - 1, 2 * N, 2 * N + 1, 2 * N + 2,328 base - 1, base, base + 1]:329 _do_test(mkdf(nrows, ncols, r_idx_nlevels=2), rnlvl=2)330 _do_test(mkdf(nrows, ncols, c_idx_nlevels=2), cnlvl=2)331 _do_test(mkdf(nrows, ncols, r_idx_nlevels=2, c_idx_nlevels=2),332 rnlvl=2, cnlvl=2)333 def test_to_csv_from_csv_w_some_infs(self):334 # test roundtrip with inf, -inf, nan, as full columns and mix335 self.frame['G'] = np.nan336 f = lambda x: [np.inf, np.nan][np.random.rand() < .5]337 self.frame['H'] = self.frame.index.map(f)338 with ensure_clean() as path:339 self.frame.to_csv(path)340 recons = self.read_csv(path)341 # TODO to_csv drops column name342 assert_frame_equal(self.frame, recons, check_names=False)343 assert_frame_equal(np.isinf(self.frame),344 np.isinf(recons), check_names=False)345 def test_to_csv_from_csv_w_all_infs(self):346 # test roundtrip with inf, -inf, nan, as full columns and mix347 self.frame['E'] = np.inf348 self.frame['F'] = -np.inf349 with ensure_clean() as path:350 self.frame.to_csv(path)351 recons = self.read_csv(path)352 # TODO to_csv drops column name353 assert_frame_equal(self.frame, recons, check_names=False)354 assert_frame_equal(np.isinf(self.frame),355 np.isinf(recons), check_names=False)356 def test_to_csv_no_index(self):357 # GH 3624, after appending columns, to_csv fails358 with ensure_clean('__tmp_to_csv_no_index__') as path:359 df = DataFrame({'c1': [1, 2, 3], 'c2': [4, 5, 6]})360 df.to_csv(path, index=False)361 result = read_csv(path)362 assert_frame_equal(df, result)363 df['c3'] = Series([7, 8, 9], dtype='int64')364 df.to_csv(path, index=False)365 result = read_csv(path)366 assert_frame_equal(df, result)367 def test_to_csv_with_mix_columns(self):368 # gh-11637: incorrect output when a mix of integer and string column369 # names passed as columns parameter in to_csv370 df = DataFrame({0: ['a', 'b', 'c'],371 1: ['aa', 'bb', 'cc']})372 df['test'] = 'txt'373 assert df.to_csv() == df.to_csv(columns=[0, 1, 'test'])374 def test_to_csv_headers(self):375 # GH6186, the presence or absence of `index` incorrectly376 # causes to_csv to have different header semantics.377 from_df = DataFrame([[1, 2], [3, 4]], columns=['A', 'B'])378 to_df = DataFrame([[1, 2], [3, 4]], columns=['X', 'Y'])379 with ensure_clean('__tmp_to_csv_headers__') as path:380 from_df.to_csv(path, header=['X', 'Y'])381 recons = self.read_csv(path)382 assert_frame_equal(to_df, recons)383 from_df.to_csv(path, index=False, header=['X', 'Y'])384 recons = self.read_csv(path)385 recons.reset_index(inplace=True)386 assert_frame_equal(to_df, recons)387 def test_to_csv_multiindex(self):388 frame = self.frame389 old_index = frame.index390 arrays = np.arange(len(old_index) * 2).reshape(2, -1)391 new_index = MultiIndex.from_arrays(arrays, names=['first', 'second'])392 frame.index = new_index393 with ensure_clean('__tmp_to_csv_multiindex__') as path:394 frame.to_csv(path, header=False)395 frame.to_csv(path, columns=['A', 'B'])396 # round trip397 frame.to_csv(path)398 df = self.read_csv(path, index_col=[0, 1],399 parse_dates=False)400 # TODO to_csv drops column name401 assert_frame_equal(frame, df, check_names=False)402 assert frame.index.names == df.index.names403 # needed if setUp becomes a class method404 self.frame.index = old_index405 # try multiindex with dates406 tsframe = self.tsframe407 old_index = tsframe.index408 new_index = [old_index, np.arange(len(old_index))]409 tsframe.index = MultiIndex.from_arrays(new_index)410 tsframe.to_csv(path, index_label=['time', 'foo'])411 recons = self.read_csv(path, index_col=[0, 1])412 # TODO to_csv drops column name413 assert_frame_equal(tsframe, recons, check_names=False)414 # do not load index415 tsframe.to_csv(path)416 recons = self.read_csv(path, index_col=None)417 assert len(recons.columns) == len(tsframe.columns) + 2418 # no index419 tsframe.to_csv(path, index=False)420 recons = self.read_csv(path, index_col=None)421 assert_almost_equal(recons.values, self.tsframe.values)422 # needed if setUp becomes class method423 self.tsframe.index = old_index424 with ensure_clean('__tmp_to_csv_multiindex__') as path:425 # GH3571, GH1651, GH3141426 def _make_frame(names=None):427 if names is True:428 names = ['first', 'second']429 return DataFrame(np.random.randint(0, 10, size=(3, 3)),430 columns=MultiIndex.from_tuples(431 [('bah', 'foo'),432 ('bah', 'bar'),433 ('ban', 'baz')], names=names),434 dtype='int64')435 # column & index are multi-index436 df = mkdf(5, 3, r_idx_nlevels=2, c_idx_nlevels=4)437 df.to_csv(path)438 result = read_csv(path, header=[0, 1, 2, 3],439 index_col=[0, 1])440 assert_frame_equal(df, result)441 # column is mi442 df = mkdf(5, 3, r_idx_nlevels=1, c_idx_nlevels=4)443 df.to_csv(path)444 result = read_csv(445 path, header=[0, 1, 2, 3], index_col=0)446 assert_frame_equal(df, result)447 # dup column names?448 df = mkdf(5, 3, r_idx_nlevels=3, c_idx_nlevels=4)449 df.to_csv(path)450 result = read_csv(path, header=[0, 1, 2, 3],451 index_col=[0, 1, 2])452 assert_frame_equal(df, result)453 # writing with no index454 df = _make_frame()455 df.to_csv(path, index=False)456 result = read_csv(path, header=[0, 1])457 assert_frame_equal(df, result)458 # we lose the names here459 df = _make_frame(True)460 df.to_csv(path, index=False)461 result = read_csv(path, header=[0, 1])462 assert com._all_none(*result.columns.names)463 result.columns.names = df.columns.names464 assert_frame_equal(df, result)465 # tupleize_cols=True and index=False466 df = _make_frame(True)467 with tm.assert_produces_warning(FutureWarning):468 df.to_csv(path, tupleize_cols=True, index=False)469 with tm.assert_produces_warning(FutureWarning,470 check_stacklevel=False):471 result = read_csv(path, header=0,472 tupleize_cols=True,473 index_col=None)474 result.columns = df.columns475 assert_frame_equal(df, result)476 # whatsnew example477 df = _make_frame()478 df.to_csv(path)479 result = read_csv(path, header=[0, 1],480 index_col=[0])481 assert_frame_equal(df, result)482 df = _make_frame(True)483 df.to_csv(path)484 result = read_csv(path, header=[0, 1],485 index_col=[0])486 assert_frame_equal(df, result)487 # column & index are multi-index (compatibility)488 df = mkdf(5, 3, r_idx_nlevels=2, c_idx_nlevels=4)489 with tm.assert_produces_warning(FutureWarning):490 df.to_csv(path, tupleize_cols=True)491 with tm.assert_produces_warning(FutureWarning,492 check_stacklevel=False):493 result = read_csv(path, header=0, index_col=[0, 1],494 tupleize_cols=True)495 result.columns = df.columns496 assert_frame_equal(df, result)497 # invalid options498 df = _make_frame(True)499 df.to_csv(path)500 for i in [6, 7]:501 msg = 'len of {i}, but only 5 lines in file'.format(i=i)502 with pytest.raises(ParserError, match=msg):503 read_csv(path, header=lrange(i), index_col=0)504 # write with cols505 msg = 'cannot specify cols with a MultiIndex'506 with pytest.raises(TypeError, match=msg):507 df.to_csv(path, columns=['foo', 'bar'])508 with ensure_clean('__tmp_to_csv_multiindex__') as path:509 # empty510 tsframe[:0].to_csv(path)511 recons = self.read_csv(path)512 exp = tsframe[:0]513 exp.index = []514 tm.assert_index_equal(recons.columns, exp.columns)515 assert len(recons) == 0516 def test_to_csv_float32_nanrep(self):517 df = DataFrame(np.random.randn(1, 4).astype(np.float32))518 df[1] = np.nan519 with ensure_clean('__tmp_to_csv_float32_nanrep__.csv') as path:520 df.to_csv(path, na_rep=999)521 with open(path) as f:522 lines = f.readlines()523 assert lines[1].split(',')[2] == '999'524 def test_to_csv_withcommas(self):525 # Commas inside fields should be correctly escaped when saving as CSV.526 df = DataFrame({'A': [1, 2, 3], 'B': ['5,6', '7,8', '9,0']})527 with ensure_clean('__tmp_to_csv_withcommas__.csv') as path:528 df.to_csv(path)529 df2 = self.read_csv(path)530 assert_frame_equal(df2, df)531 def test_to_csv_mixed(self):532 def create_cols(name):533 return ["%s%03d" % (name, i) for i in range(5)]534 df_float = DataFrame(np.random.randn(535 100, 5), dtype='float64', columns=create_cols('float'))536 df_int = DataFrame(np.random.randn(100, 5),537 dtype='int64', columns=create_cols('int'))538 df_bool = DataFrame(True, index=df_float.index,539 columns=create_cols('bool'))540 df_object = DataFrame('foo', index=df_float.index,541 columns=create_cols('object'))542 df_dt = DataFrame(Timestamp('20010101'),543 index=df_float.index, columns=create_cols('date'))544 # add in some nans545 df_float.loc[30:50, 1:3] = np.nan546 # ## this is a bug in read_csv right now ####547 # df_dt.loc[30:50,1:3] = np.nan548 df = pd.concat([df_float, df_int, df_bool, df_object, df_dt], axis=1)549 # dtype550 dtypes = dict()551 for n, dtype in [('float', np.float64), ('int', np.int64),552 ('bool', np.bool), ('object', np.object)]:553 for c in create_cols(n):554 dtypes[c] = dtype555 with ensure_clean() as filename:556 df.to_csv(filename)557 rs = read_csv(filename, index_col=0, dtype=dtypes,558 parse_dates=create_cols('date'))559 assert_frame_equal(rs, df)560 def test_to_csv_dups_cols(self):561 df = DataFrame(np.random.randn(1000, 30), columns=lrange(562 15) + lrange(15), dtype='float64')563 with ensure_clean() as filename:564 df.to_csv(filename) # single dtype, fine565 result = read_csv(filename, index_col=0)566 result.columns = df.columns567 assert_frame_equal(result, df)568 df_float = DataFrame(np.random.randn(1000, 3), dtype='float64')569 df_int = DataFrame(np.random.randn(1000, 3), dtype='int64')570 df_bool = DataFrame(True, index=df_float.index, columns=lrange(3))571 df_object = DataFrame('foo', index=df_float.index, columns=lrange(3))572 df_dt = DataFrame(Timestamp('20010101'),573 index=df_float.index, columns=lrange(3))574 df = pd.concat([df_float, df_int, df_bool, df_object,575 df_dt], axis=1, ignore_index=True)576 cols = []577 for i in range(5):578 cols.extend([0, 1, 2])579 df.columns = cols580 with ensure_clean() as filename:581 df.to_csv(filename)582 result = read_csv(filename, index_col=0)583 # date cols584 for i in ['0.4', '1.4', '2.4']:585 result[i] = to_datetime(result[i])586 result.columns = df.columns587 assert_frame_equal(result, df)588 # GH3457589 from pandas.util.testing import makeCustomDataframe as mkdf590 N = 10591 df = mkdf(N, 3)592 df.columns = ['a', 'a', 'b']593 with ensure_clean() as filename:594 df.to_csv(filename)595 # read_csv will rename the dups columns596 result = read_csv(filename, index_col=0)597 result = result.rename(columns={'a.1': 'a'})598 assert_frame_equal(result, df)599 def test_to_csv_chunking(self):600 aa = DataFrame({'A': lrange(100000)})601 aa['B'] = aa.A + 1.0602 aa['C'] = aa.A + 2.0603 aa['D'] = aa.A + 3.0604 for chunksize in [10000, 50000, 100000]:605 with ensure_clean() as filename:606 aa.to_csv(filename, chunksize=chunksize)607 rs = read_csv(filename, index_col=0)608 assert_frame_equal(rs, aa)609 @pytest.mark.slow610 def test_to_csv_wide_frame_formatting(self):611 # Issue #8621612 df = DataFrame(np.random.randn(1, 100010), columns=None, index=None)613 with ensure_clean() as filename:614 df.to_csv(filename, header=False, index=False)615 rs = read_csv(filename, header=None)616 assert_frame_equal(rs, df)617 def test_to_csv_bug(self):618 f1 = StringIO('a,1.0\nb,2.0')619 df = self.read_csv(f1, header=None)620 newdf = DataFrame({'t': df[df.columns[0]]})621 with ensure_clean() as path:622 newdf.to_csv(path)623 recons = read_csv(path, index_col=0)624 # don't check_names as t != 1625 assert_frame_equal(recons, newdf, check_names=False)626 def test_to_csv_unicode(self):627 df = DataFrame({u('c/\u03c3'): [1, 2, 3]})628 with ensure_clean() as path:629 df.to_csv(path, encoding='UTF-8')630 df2 = read_csv(path, index_col=0, encoding='UTF-8')631 assert_frame_equal(df, df2)632 df.to_csv(path, encoding='UTF-8', index=False)633 df2 = read_csv(path, index_col=None, encoding='UTF-8')634 assert_frame_equal(df, df2)635 def test_to_csv_unicode_index_col(self):636 buf = StringIO('')637 df = DataFrame(638 [[u("\u05d0"), "d2", "d3", "d4"], ["a1", "a2", "a3", "a4"]],639 columns=[u("\u05d0"),640 u("\u05d1"), u("\u05d2"), u("\u05d3")],641 index=[u("\u05d0"), u("\u05d1")])642 df.to_csv(buf, encoding='UTF-8')643 buf.seek(0)644 df2 = read_csv(buf, index_col=0, encoding='UTF-8')645 assert_frame_equal(df, df2)646 def test_to_csv_stringio(self):647 buf = StringIO()648 self.frame.to_csv(buf)649 buf.seek(0)650 recons = read_csv(buf, index_col=0)651 # TODO to_csv drops column name652 assert_frame_equal(recons, self.frame, check_names=False)653 def test_to_csv_float_format(self):654 df = DataFrame([[0.123456, 0.234567, 0.567567],655 [12.32112, 123123.2, 321321.2]],656 index=['A', 'B'], columns=['X', 'Y', 'Z'])657 with ensure_clean() as filename:658 df.to_csv(filename, float_format='%.2f')659 rs = read_csv(filename, index_col=0)660 xp = DataFrame([[0.12, 0.23, 0.57],661 [12.32, 123123.20, 321321.20]],662 index=['A', 'B'], columns=['X', 'Y', 'Z'])663 assert_frame_equal(rs, xp)664 def test_to_csv_unicodewriter_quoting(self):665 df = DataFrame({'A': [1, 2, 3], 'B': ['foo', 'bar', 'baz']})666 buf = StringIO()667 df.to_csv(buf, index=False, quoting=csv.QUOTE_NONNUMERIC,668 encoding='utf-8')669 result = buf.getvalue()670 expected_rows = ['"A","B"',671 '1,"foo"',672 '2,"bar"',673 '3,"baz"']674 expected = tm.convert_rows_list_to_csv_str(expected_rows)675 assert result == expected676 def test_to_csv_quote_none(self):677 # GH4328678 df = DataFrame({'A': ['hello', '{"hello"}']})679 for encoding in (None, 'utf-8'):680 buf = StringIO()681 df.to_csv(buf, quoting=csv.QUOTE_NONE,682 encoding=encoding, index=False)683 result = buf.getvalue()684 expected_rows = ['A',685 'hello',686 '{"hello"}']687 expected = tm.convert_rows_list_to_csv_str(expected_rows)688 assert result == expected689 def test_to_csv_index_no_leading_comma(self):690 df = DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},691 index=['one', 'two', 'three'])692 buf = StringIO()693 df.to_csv(buf, index_label=False)694 expected_rows = ['A,B',695 'one,1,4',696 'two,2,5',697 'three,3,6']698 expected = tm.convert_rows_list_to_csv_str(expected_rows)699 assert buf.getvalue() == expected700 def test_to_csv_line_terminators(self):701 # see gh-20353702 df = DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6]},703 index=['one', 'two', 'three'])704 with ensure_clean() as path:705 # case 1: CRLF as line terminator706 df.to_csv(path, line_terminator='\r\n')707 expected = b',A,B\r\none,1,4\r\ntwo,2,5\r\nthree,3,6\r\n'708 with open(path, mode='rb') as f:709 assert f.read() == expected710 with ensure_clean() as path:711 # case 2: LF as line terminator712 df.to_csv(path, line_terminator='\n')713 expected = b',A,B\none,1,4\ntwo,2,5\nthree,3,6\n'714 with open(path, mode='rb') as f:715 assert f.read() == expected716 with ensure_clean() as path:717 # case 3: The default line terminator(=os.linesep)(gh-21406)718 df.to_csv(path)719 os_linesep = os.linesep.encode('utf-8')720 expected = (b',A,B' + os_linesep + b'one,1,4' + os_linesep +721 b'two,2,5' + os_linesep + b'three,3,6' + os_linesep)722 with open(path, mode='rb') as f:723 assert f.read() == expected724 def test_to_csv_from_csv_categorical(self):725 # CSV with categoricals should result in the same output726 # as when one would add a "normal" Series/DataFrame.727 s = Series(pd.Categorical(["a", "b", "b", "a", "a", "c", "c", "c"]))728 s2 = Series(["a", "b", "b", "a", "a", "c", "c", "c"])729 res = StringIO()730 s.to_csv(res, header=False)731 exp = StringIO()732 s2.to_csv(exp, header=False)733 assert res.getvalue() == exp.getvalue()734 df = DataFrame({"s": s})735 df2 = DataFrame({"s": s2})736 res = StringIO()737 df.to_csv(res)738 exp = StringIO()739 df2.to_csv(exp)740 assert res.getvalue() == exp.getvalue()741 def test_to_csv_path_is_none(self):742 # GH 8215743 # Make sure we return string for consistency with744 # Series.to_csv()745 csv_str = self.frame.to_csv(path_or_buf=None)746 assert isinstance(csv_str, str)747 recons = pd.read_csv(StringIO(csv_str), index_col=0)748 assert_frame_equal(self.frame, recons)749 @pytest.mark.parametrize('df,encoding', [750 (DataFrame([[0.123456, 0.234567, 0.567567],751 [12.32112, 123123.2, 321321.2]],752 index=['A', 'B'], columns=['X', 'Y', 'Z']), None),753 # GH 21241, 21118754 (DataFrame([['abc', 'def', 'ghi']], columns=['X', 'Y', 'Z']), 'ascii'),755 (DataFrame(5 * [[123, u"ä½ å¥½", u"ä¸ç"]],756 columns=['X', 'Y', 'Z']), 'gb2312'),757 (DataFrame(5 * [[123, u"Îειά ÏοÏ
", u"ÎÏÏμε"]],758 columns=['X', 'Y', 'Z']), 'cp737')759 ])760 def test_to_csv_compression(self, df, encoding, compression):761 with ensure_clean() as filename:762 df.to_csv(filename, compression=compression, encoding=encoding)763 # test the round trip - to_csv -> read_csv764 result = read_csv(filename, compression=compression,765 index_col=0, encoding=encoding)766 assert_frame_equal(df, result)767 # test the round trip using file handle - to_csv -> read_csv768 f, _handles = _get_handle(filename, 'w', compression=compression,769 encoding=encoding)770 with f:771 df.to_csv(f, encoding=encoding)772 result = pd.read_csv(filename, compression=compression,773 encoding=encoding, index_col=0, squeeze=True)774 assert_frame_equal(df, result)775 # explicitly make sure file is compressed776 with tm.decompress_file(filename, compression) as fh:777 text = fh.read().decode(encoding or 'utf8')778 for col in df.columns:779 assert col in text780 with tm.decompress_file(filename, compression) as fh:781 assert_frame_equal(df, read_csv(fh,782 index_col=0,783 encoding=encoding))784 def test_to_csv_date_format(self):785 with ensure_clean('__tmp_to_csv_date_format__') as path:786 dt_index = self.tsframe.index787 datetime_frame = DataFrame(788 {'A': dt_index, 'B': dt_index.shift(1)}, index=dt_index)789 datetime_frame.to_csv(path, date_format='%Y%m%d')790 # Check that the data was put in the specified format791 test = read_csv(path, index_col=0)792 datetime_frame_int = datetime_frame.applymap(793 lambda x: int(x.strftime('%Y%m%d')))794 datetime_frame_int.index = datetime_frame_int.index.map(795 lambda x: int(x.strftime('%Y%m%d')))796 assert_frame_equal(test, datetime_frame_int)797 datetime_frame.to_csv(path, date_format='%Y-%m-%d')798 # Check that the data was put in the specified format799 test = read_csv(path, index_col=0)800 datetime_frame_str = datetime_frame.applymap(801 lambda x: x.strftime('%Y-%m-%d'))802 datetime_frame_str.index = datetime_frame_str.index.map(803 lambda x: x.strftime('%Y-%m-%d'))804 assert_frame_equal(test, datetime_frame_str)805 # Check that columns get converted806 datetime_frame_columns = datetime_frame.T807 datetime_frame_columns.to_csv(path, date_format='%Y%m%d')808 test = read_csv(path, index_col=0)809 datetime_frame_columns = datetime_frame_columns.applymap(810 lambda x: int(x.strftime('%Y%m%d')))811 # Columns don't get converted to ints by read_csv812 datetime_frame_columns.columns = (813 datetime_frame_columns.columns814 .map(lambda x: x.strftime('%Y%m%d')))815 assert_frame_equal(test, datetime_frame_columns)816 # test NaTs817 nat_index = to_datetime(818 ['NaT'] * 10 + ['2000-01-01', '1/1/2000', '1-1-2000'])819 nat_frame = DataFrame({'A': nat_index}, index=nat_index)820 nat_frame.to_csv(path, date_format='%Y-%m-%d')821 test = read_csv(path, parse_dates=[0, 1], index_col=0)822 assert_frame_equal(test, nat_frame)823 def test_to_csv_with_dst_transitions(self):824 with ensure_clean('csv_date_format_with_dst') as path:825 # make sure we are not failing on transitions826 times = pd.date_range("2013-10-26 23:00", "2013-10-27 01:00",827 tz="Europe/London",828 freq="H",829 ambiguous='infer')830 for i in [times, times + pd.Timedelta('10s')]:831 time_range = np.array(range(len(i)), dtype='int64')832 df = DataFrame({'A': time_range}, index=i)833 df.to_csv(path, index=True)834 # we have to reconvert the index as we835 # don't parse the tz's836 result = read_csv(path, index_col=0)837 result.index = to_datetime(result.index, utc=True).tz_convert(838 'Europe/London')839 assert_frame_equal(result, df)840 # GH11619841 idx = pd.date_range('2015-01-01', '2015-12-31',842 freq='H', tz='Europe/Paris')843 df = DataFrame({'values': 1, 'idx': idx},844 index=idx)845 with ensure_clean('csv_date_format_with_dst') as path:846 df.to_csv(path, index=True)847 result = read_csv(path, index_col=0)848 result.index = to_datetime(result.index, utc=True).tz_convert(849 'Europe/Paris')850 result['idx'] = to_datetime(result['idx'], utc=True).astype(851 'datetime64[ns, Europe/Paris]')852 assert_frame_equal(result, df)853 # assert working854 df.astype(str)855 with ensure_clean('csv_date_format_with_dst') as path:856 df.to_pickle(path)857 result = pd.read_pickle(path)858 assert_frame_equal(result, df)859 def test_to_csv_quoting(self):860 df = DataFrame({861 'c_bool': [True, False],862 'c_float': [1.0, 3.2],863 'c_int': [42, np.nan],864 'c_string': ['a', 'b,c'],865 })866 expected_rows = [',c_bool,c_float,c_int,c_string',867 '0,True,1.0,42.0,a',868 '1,False,3.2,,"b,c"']869 expected = tm.convert_rows_list_to_csv_str(expected_rows)870 result = df.to_csv()871 assert result == expected872 result = df.to_csv(quoting=None)873 assert result == expected874 expected_rows = [',c_bool,c_float,c_int,c_string',875 '0,True,1.0,42.0,a',876 '1,False,3.2,,"b,c"']877 expected = tm.convert_rows_list_to_csv_str(expected_rows)878 result = df.to_csv(quoting=csv.QUOTE_MINIMAL)879 assert result == expected880 expected_rows = ['"","c_bool","c_float","c_int","c_string"',881 '"0","True","1.0","42.0","a"',882 '"1","False","3.2","","b,c"']883 expected = tm.convert_rows_list_to_csv_str(expected_rows)884 result = df.to_csv(quoting=csv.QUOTE_ALL)885 assert result == expected886 # see gh-12922, gh-13259: make sure changes to887 # the formatters do not break this behaviour888 expected_rows = ['"","c_bool","c_float","c_int","c_string"',889 '0,True,1.0,42.0,"a"',890 '1,False,3.2,"","b,c"']891 expected = tm.convert_rows_list_to_csv_str(expected_rows)892 result = df.to_csv(quoting=csv.QUOTE_NONNUMERIC)893 assert result == expected894 msg = "need to escape, but no escapechar set"895 with pytest.raises(csv.Error, match=msg):896 df.to_csv(quoting=csv.QUOTE_NONE)897 with pytest.raises(csv.Error, match=msg):898 df.to_csv(quoting=csv.QUOTE_NONE, escapechar=None)899 expected_rows = [',c_bool,c_float,c_int,c_string',900 '0,True,1.0,42.0,a',901 '1,False,3.2,,b!,c']902 expected = tm.convert_rows_list_to_csv_str(expected_rows)903 result = df.to_csv(quoting=csv.QUOTE_NONE,904 escapechar='!')905 assert result == expected906 expected_rows = [',c_bool,c_ffloat,c_int,c_string',907 '0,True,1.0,42.0,a',908 '1,False,3.2,,bf,c']909 expected = tm.convert_rows_list_to_csv_str(expected_rows)910 result = df.to_csv(quoting=csv.QUOTE_NONE,911 escapechar='f')912 assert result == expected913 # see gh-3503: quoting Windows line terminators914 # presents with encoding?915 text_rows = ['a,b,c',916 '1,"test \r\n",3']917 text = tm.convert_rows_list_to_csv_str(text_rows)918 df = pd.read_csv(StringIO(text))919 buf = StringIO()920 df.to_csv(buf, encoding='utf-8', index=False)921 assert buf.getvalue() == text922 # xref gh-7791: make sure the quoting parameter is passed through923 # with multi-indexes924 df = pd.DataFrame({'a': [1, 2], 'b': [3, 4], 'c': [5, 6]})925 df = df.set_index(['a', 'b'])926 expected_rows = ['"a","b","c"',927 '"1","3","5"',928 '"2","4","6"']929 expected = tm.convert_rows_list_to_csv_str(expected_rows)930 assert df.to_csv(quoting=csv.QUOTE_ALL) == expected931 def test_period_index_date_overflow(self):932 # see gh-15982933 dates = ["1990-01-01", "2000-01-01", "3005-01-01"]934 index = pd.PeriodIndex(dates, freq="D")935 df = pd.DataFrame([4, 5, 6], index=index)936 result = df.to_csv()937 expected_rows = [',0',938 '1990-01-01,4',939 '2000-01-01,5',940 '3005-01-01,6']941 expected = tm.convert_rows_list_to_csv_str(expected_rows)942 assert result == expected943 date_format = "%m-%d-%Y"944 result = df.to_csv(date_format=date_format)945 expected_rows = [',0',946 '01-01-1990,4',947 '01-01-2000,5',948 '01-01-3005,6']949 expected = tm.convert_rows_list_to_csv_str(expected_rows)950 assert result == expected951 # Overflow with pd.NaT952 dates = ["1990-01-01", pd.NaT, "3005-01-01"]953 index = pd.PeriodIndex(dates, freq="D")954 df = pd.DataFrame([4, 5, 6], index=index)955 result = df.to_csv()956 expected_rows = [',0',957 '1990-01-01,4',958 ',5',959 '3005-01-01,6']960 expected = tm.convert_rows_list_to_csv_str(expected_rows)961 assert result == expected962 def test_multi_index_header(self):963 # see gh-5539964 columns = pd.MultiIndex.from_tuples([("a", 1), ("a", 2),965 ("b", 1), ("b", 2)])966 df = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]])967 df.columns = columns968 header = ["a", "b", "c", "d"]969 result = df.to_csv(header=header)970 expected_rows = [',a,b,c,d',971 '0,1,2,3,4',972 '1,5,6,7,8']973 expected = tm.convert_rows_list_to_csv_str(expected_rows)974 assert result == expected975 def test_gz_lineend(self):976 # GH 25311977 df = pd.DataFrame({'a': [1, 2]})978 expected_rows = ['a', '1', '2']979 expected = tm.convert_rows_list_to_csv_str(expected_rows)980 with ensure_clean('__test_gz_lineend.csv.gz') as path:981 df.to_csv(path, index=False)982 with tm.decompress_file(path, compression='gzip') as f:983 result = f.read().decode('utf-8')...
read_csv_file.py
Source:read_csv_file.py
1## Import libraries2import csv3import sys4## Read input csv file (returns a list of rows)5def read_csv_file(input_csv_file: str, output_format='list'):6 ## Open the CSV file7 with open(input_csv_file, 'r', encoding='UTF8') as input_file:8 # Prevent possible errors due to large columns (beyond 131072 characters)9 try:10 if output_format == 'list':11 input_csv_file_lines = list(csv.reader(input_file))12 elif output_format == 'dictionary':13 input_csv_file_dictionary = csv.DictReader(input_file)14 input_csv_file_lines = []15 for row in input_csv_file_dictionary:16 input_csv_file_lines.append(dict(row))17 else:18 input_csv_file_lines = csv.reader(input_file)19 except:20 print("Presence of too large cells!!!")21 field_size_limit = sys.maxsize22 while True:23 try:24 csv.field_size_limit(field_size_limit)25 break26 except:27 field_size_limit = int(field_size_limit / 10)28 if output_format == 'list':29 input_csv_file_lines = list(csv.reader(input_file))30 elif output_format == 'dictionary':31 input_csv_file_dictionary = csv.DictReader(input_file)32 input_csv_file_lines = []33 for row in input_csv_file_dictionary:34 input_csv_file_lines.append(dict(row))35 else:36 input_csv_file_lines = csv.reader(input_file)37 ## Bring the row lengths on par38 if output_format == 'list':39 csv_column_header = input_csv_file_lines[0]40 csv_column_number = len(csv_column_header)41 for r in range(len(input_csv_file_lines)):42 if len(input_csv_file_lines[r]) < csv_column_number:43 for cdiff in range(csv_column_number - len(input_csv_file_lines[r])):44 input_csv_file_lines[r].append(None)45 else:46 pass47 # return...
csv_value_replacer.py
Source:csv_value_replacer.py
1# Import libraries2from functions.libraries import *3from functions.create_replacing_map import *4from functions.replace_csv_values import *5from functions.read_csv_file import *6from functions.write_csv_file import *7import sys8# Input map CSV file (with replacing map --> "Old value" ; "New value")9Tk().withdraw()10messagebox.showinfo(title='Select map CSV file', message='Select the CSV file with the "old"-"new" map for value replacement')11Tk().withdraw()12input_csv_file_with_map = filedialog.askopenfilename(filetypes=[('CSV files', '.csv')])13Tk().withdraw()14messagebox.showinfo(title='CSV file selected', message="The CSV file selected is '%s'" % (input_csv_file_with_map))15# Try to read map only if a file is selected16if input_csv_file_with_map != "":17 # Open CSV with mapping18 input_csv_file_with_map_lines = read_csv_file(input_csv_file_with_map)19 # Create the map20 mapping_dictionary_array = create_replacing_map(input_csv_file_with_map_lines)21else :22 input_csv_file_with_map_lines = []23 mapping_dictionary_array = []24# Input CSV file (to be replaced)25Tk().withdraw()26messagebox.showinfo(title='Select CSV file', message='Select the CSV file with values to be replaced')27Tk().withdraw()28input_csv_file = filedialog.askopenfilename(filetypes=[('CSV files', '.csv')])29Tk().withdraw()30messagebox.showinfo(title='CSV file selected', message="The CSV file selected is '%s'" % (input_csv_file))31# Run only if a file is selected32if input_csv_file_with_map != "":33 # Read the input CSV34 input_csv_file_lines = read_csv_file(input_csv_file)35 # Generate the output36 output_csv_file_lines = replace_csv_values(input_csv_file_lines, mapping_dictionary_array, add_new_column_if_match_is_missing=True)37 # Write the output file onto the input file 38 write_csv_file(output_csv_file_lines, input_csv_file)39 # Success40 Tk().withdraw()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!