gpt4 book ai didi

python - 在并发进程池期间从磁盘读取 pandas

转载 作者:行者123 更新时间:2023-12-01 06:40:02 26 4
gpt4 key购买 nike

我编写了一个 cli 工具来生成模拟,我希望为我大约 200 个数据的每次切割生成大约 10k(大约 10 分钟)。我有一些函数可以在 for 循环中很好地完成此操作,但是当我将其转换为 concurrent.futures.ProcessPoolExecutor() 时,我意识到多个进程无法在同一个 pandas 数据帧中读取。

这是我能想到的最小的例子:

import concurrent.futures
import pandas as pd

def example():
# This is a static table with basic information like distributions
df = pd.read_parquet("batch/data/mappings.pq")
# Then there's a bunch of etl, even reading in a few other static tables
return sum(df.shape)

def main():
results = []
with concurrent.futures.ProcessPoolExecutor() as pool:
futr_results = [pool.submit(example) for _ in range(100)]
done_results = concurrent.futures.as_completed(futr_results)
for _ in futr_results:
results.append(next(done_results).result())

return results

if __name__ == "__main__":
print(main())

错误:

<jemalloc>: background thread creation failed (11)
terminate called after throwing an instance of 'std::system_error'
what(): Resource temporarily unavailable
Traceback (most recent call last):
File "batch/testing.py", line 19, in <module>
main()
File "batch/testing.py", line 14, in main
results.append(next(done_results).result())
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/home/a114383/miniconda3/envs/hailsims/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

我希望有一种快速而肮脏的方式来读取这些(我猜测没有引用?),否则看起来我需要先创建所有参数而不是即时获取它们。

最佳答案

我会尝试的三件事:

  • Pandas 有an option用于在读取 parquet 文件时使用 PyArrow 或 FastParquet。尝试使用不同的 - 这似乎是一个错误。

  • 尝试强制 pandas 以只读模式打开文件,以防止由于文件被锁定而发生冲突:

pd.read_parquet(open("batch/data/mappings.pq", "rb"))
# Also try "r" instead of "rb", not sure if pandas expects string or binary data
  • 尝试将文件加载到 StringIO/BytesIO 缓冲区中,然后将其交给 pandas - 这可以避免 pandas 本身与文件的任何交互:
import io

# either this (binary)
data = io.BytesIO(open("batch/data/mappings.pq", "rb").read())
# or this (string)
data = io.StringIO(open("batch/data/mappings.pq", "r").read())

pd.read_parquet(data)

关于python - 在并发进程池期间从磁盘读取 pandas,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59495151/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com