- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Pandas to_parquet 保存一个非常大的数据集,当超过某个限制时,它似乎失败了,无论是“pyarrow”还是“fastparquet”。我使用以下代码重现了我遇到的错误,并且很高兴听到有关如何克服该问题的想法:
使用 Pyarrow:
low = 3
high = 8
for n in np.logspace(low, high, high-low+1):
t0 = time()
df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
pd.read_parquet(tmp_file, engine='pyarrow')
print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')
10^3.0 read-write took 0.012851715087890625 seconds
10^4.0 read-write took 0.05722832679748535 seconds
10^5.0 read-write took 0.46846866607666016 seconds
10^6.0 read-write took 4.4494054317474365 seconds
10^7.0 read-write took 43.0602171421051 seconds
---------------------------------------------------------------------------
ArrowIOError Traceback (most recent call last)
<ipython-input-51-cad917a26b91> in <module>()
5 df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
6 df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
----> 7 pd.read_parquet(tmp_file, engine='pyarrow')
8 print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in read_parquet(path, engine, columns, **kwargs)
255
256 impl = get_engine(engine)
--> 257 return impl.read(path, columns=columns, **kwargs)
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in read(self, path, columns, **kwargs)
128 kwargs['use_pandas_metadata'] = True
129 return self.api.parquet.read_table(path, columns=columns,
--> 130 **kwargs).to_pandas()
131
132 def _validate_write_lt_070(self, df):
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py in read_table(source, columns, nthreads, metadata, use_pandas_metadata)
939 pf = ParquetFile(source, metadata=metadata)
940 return pf.read(columns=columns, nthreads=nthreads,
--> 941 use_pandas_metadata=use_pandas_metadata)
942
943
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py in read(self, columns, nthreads, use_pandas_metadata)
148 columns, use_pandas_metadata=use_pandas_metadata)
149 return self.reader.read_all(column_indices=column_indices,
--> 150 nthreads=nthreads)
151
152 def scan_contents(self, columns=None, batch_size=65536):
_parquet.pyx in pyarrow._parquet.ParquetReader.read_all()
error.pxi in pyarrow.lib.check_status()
ArrowIOError: Arrow error: Invalid: BinaryArray cannot contain more than 2147483646 bytes, have 2147483650
low = 3
high = 8
for n in np.logspace(low, high, high-low+1):
t0 = time()
df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
df.to_parquet(tmp_file, engine='fastparquet', compression='gzip')
pd.read_parquet(tmp_file, engine='fastparquet')
print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')
10^3.0 read-write took 0.17770028114318848 seconds
10^4.0 read-write took 0.06351733207702637 seconds
10^5.0 read-write took 0.46896958351135254 seconds
10^6.0 read-write took 5.464379549026489 seconds
10^7.0 read-write took 50.26520347595215 seconds
---------------------------------------------------------------------------
OverflowError Traceback (most recent call last)
<ipython-input-49-234a889ae790> in <module>()
4 t0 = time()
5 df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
----> 6 df.to_parquet(tmp_file, engine='fastparquet', compression='gzip')
7 pd.read_parquet(tmp_file, engine='fastparquet')
8 print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/core/frame.py in to_parquet(self, fname, engine, compression, **kwargs)
1647 from pandas.io.parquet import to_parquet
1648 to_parquet(self, fname, engine,
-> 1649 compression=compression, **kwargs)
1650
1651 @Substitution(header='Write out the column names. If a list of strings '
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in to_parquet(df, path, engine, compression, **kwargs)
225 """
226 impl = get_engine(engine)
--> 227 return impl.write(df, path, compression=compression, **kwargs)
228
229
~/.conda/envs/anaconda3/lib/python3.6/site-packages/pandas/io/parquet.py in write(self, df, path, compression, **kwargs)
198 with catch_warnings(record=True):
199 self.api.write(path, df,
--> 200 compression=compression, **kwargs)
201
202 def read(self, path, columns=None, **kwargs):
~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in write(filename, data, row_group_offsets, compression, file_scheme, open_with, mkdirs, has_nulls, write_index, partition_on, fixed_text, append, object_encoding, times)
846 if file_scheme == 'simple':
847 write_simple(filename, data, fmd, row_group_offsets,
--> 848 compression, open_with, has_nulls, append)
849 elif file_scheme in ['hive', 'drill']:
850 if append:
~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in write_simple(fn, data, fmd, row_group_offsets, compression, open_with, has_nulls, append)
715 else None)
716 rg = make_row_group(f, data[start:end], fmd.schema,
--> 717 compression=compression)
718 if rg is not None:
719 fmd.row_groups.append(rg)
~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in make_row_group(f, data, schema, compression)
612 comp = compression
613 chunk = write_column(f, data[column.name], column,
--> 614 compression=comp)
615 rg.columns.append(chunk)
616 rg.total_byte_size = sum([c.meta_data.total_uncompressed_size for c in
~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/writer.py in write_column(f, data, selement, compression)
545 data_page_header=dph, crc=None)
546
--> 547 write_thrift(f, ph)
548 f.write(bdata)
549
~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/thrift_structures.py in write_thrift(fobj, thrift)
49 pout = TCompactProtocol(fobj)
50 try:
---> 51 thrift.write(pout)
52 fail = False
53 except TProtocolException as e:
~/.conda/envs/anaconda3/lib/python3.6/site-packages/fastparquet/parquet_thrift/parquet/ttypes.py in write(self, oprot)
1028 def write(self, oprot):
1029 if oprot._fast_encode is not None and self.thrift_spec is not None:
-> 1030 oprot.trans.write(oprot._fast_encode(self, [self.__class__, self.thrift_spec]))
1031 return
1032 oprot.writeStructBegin('PageHeader')
OverflowError: int out of range
最佳答案
看来你用 Pyarrow 写成功但不能读,用 fastparquet 写失败,因此没有读。我建议您使用 Pyarrow 写入数据并使用 fastparquet 逐块读取,遍历行组:
from fastparquet import ParquetFile
df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
pf = ParquetFile(tmp_file)
for df in pf.iter_row_groups():
print(df.head(n=10))
关于pandas to_parquet 在大型数据集上失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50782252/
我正在尝试使用 Pandas to_parquet 保存一个非常大的数据集,当超过某个限制时,它似乎失败了,无论是“pyarrow”还是“fastparquet”。我使用以下代码重现了我遇到的错误,并
是否可以使用 Pandas 的 DataFrame.to_parquet功能将写入拆分为一些近似所需大小的多个文件? 我有一个非常大的 DataFrame (100M x 100),并且正在使用 df
当我使用 dask=1.2.2 和 pyarrow 0.11.1 时,我没有观察到这种行为。更新后(dask=2.10.1 和 pyarrow=0.15.1),当我使用带有给定 partition_o
要将 Parquet 文件读入多个分区,应使用行组进行存储(请参阅 How to read a single large parquet file into multiple partitions u
我有一个非常宽的数据框(20,000 列),主要由 Pandas 中的 float64 列组成。我想将这些列转换为 float32 并写入 Parquet 格式。我这样做是因为这些文件的下游用户是内存
我们公司要求对 S3 中的所有静态数据进行加密。通常当我们上传 s3 对象时,我们会做这样的事情: aws s3 cp a.txt s3://b/test --sse 我正在玩 dask.datafr
我有以下工作流程。 def read_file(path, indx): df = pd.read_parquet(path) df.index = [indx] * len(df)
我有以下工作流程。 def read_file(path, indx): df = pd.read_parquet(path) df.index = [indx] * len(df)
我有一个 Pandas 数据框,我正在尝试将其作为 Parquet 文件保存到 S3 中: dftest = pd.DataFrame({'field': [1,2,3]}) dftest.to_pa
如何强制 Pandas DataFrame 保留 None值,即使使用 astype() ? 细节 自 pd.DataFrame构造函数不提供化合物 dtype参数,我使用以下函数修复类型( to_p
我正在尝试为 AWS Lambda 创建代码以将 csv 转换为 parquet。我可以使用 Pyarrow 做到这一点,但它的大小太大(约 200 MB 未压缩),因此我无法在 Lambda 的部署
我是一名优秀的程序员,十分优秀!