- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我必须逐行处理一个巨大的 pandas.DataFrame
(几十 GB),其中每行操作都相当长(几十毫秒)。所以我有了将帧拆分为 block 并使用 multiprocessing
并行处理每个 block 的想法。这确实加快了任务的速度,但内存消耗是一场噩梦。
虽然每个子进程原则上应该只消耗一小块数据,但它需要(几乎)与包含原始 DataFrame
的原始父进程一样多的内存。即使删除父进程中使用过的部分也无济于事。
我写了一个最小的例子来复制这种行为。它所做的唯一一件事就是创建一个带有随机数的大型 DataFrame
,将其分成最多 100 行的小块,并在多处理期间简单地打印一些关于 DataFrame
的信息(此处通过大小为 4 的 mp.Pool
。
并行执行的主函数:
def just_wait_and_print_len_and_idx(df):
"""Waits for 5 seconds and prints df length and first and last index"""
# Extract some info
idx_values = df.index.values
first_idx, last_idx = idx_values[0], idx_values[-1]
length = len(df)
pid = os.getpid()
# Waste some CPU cycles
time.sleep(1)
# Print the info
print('First idx {}, last idx {} and len {} '
'from process {}'.format(first_idx, last_idx, length, pid))
将 DataFrame
分成小块的辅助生成器:
def df_chunking(df, chunksize):
"""Splits df into chunks, drops data of original df inplace"""
count = 0 # Counter for chunks
while len(df):
count += 1
print('Preparing chunk {}'.format(count))
# Return df chunk
yield df.iloc[:chunksize].copy()
# Delete data in place because it is no longer needed
df.drop(df.index[:chunksize], inplace=True)
以及主要例程:
def main():
# Job parameters
n_jobs = 4 # Poolsize
size = (10000, 1000) # Size of DataFrame
chunksize = 100 # Maximum size of Frame Chunk
# Preparation
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)
print('Starting MP')
# Execute the wait and print function in parallel
pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))
pool.close()
pool.join()
print('DONE')
标准输出是这样的:
Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE
主进程需要大约 120MB 的内存。但是,池的子进程需要相同数量的内存,尽管它们只包含原始 DataFame
的 1%(大小为 100 的 block 与原始长度为 10000 的 block ) .为什么?
我该怎么办?尽管我进行了分块,Python (3) 是否会将整个 DataFrame
发送到每个子进程?这是 pandas
内存管理的问题还是 multiprocessing
和数据 pickling 的错误?谢谢!
如果您想自己尝试,只需复制和粘贴整个脚本:
import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os
def just_wait_and_print_len_and_idx(df):
"""Waits for 5 seconds and prints df length and first and last index"""
# Extract some info
idx_values = df.index.values
first_idx, last_idx = idx_values[0], idx_values[-1]
length = len(df)
pid = os.getpid()
# Waste some CPU cycles
time.sleep(1)
# Print the info
print('First idx {}, last idx {} and len {} '
'from process {}'.format(first_idx, last_idx, length, pid))
def df_chunking(df, chunksize):
"""Splits df into chunks, drops data of original df inplace"""
count = 0 # Counter for chunks
while len(df):
count += 1
print('Preparing chunk {}'.format(count))
# Return df chunk
yield df.iloc[:chunksize].copy()
# Delete data in place because it is no longer needed
df.drop(df.index[:chunksize], inplace=True)
def main():
# Job parameters
n_jobs = 4 # Poolsize
size = (10000, 1000) # Size of DataFrame
chunksize = 100 # Maximum size of Frame Chunk
# Preparation
df = pd.DataFrame(np.random.rand(*size))
pool = mp.Pool(n_jobs)
print('Starting MP')
# Execute the wait and print function in parallel
pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))
pool.close()
pool.join()
print('DONE')
if __name__ == '__main__':
main()
最佳答案
好的,所以我在 Sebastian Opałczyński 在评论中的提示后弄明白了。
问题在于子进程是从父进程派生出来的,因此它们都包含对原始 DataFrame
的引用。然而,框架是在原始进程中操作的,因此写时复制行为会缓慢地杀死整个过程,最终会在达到物理内存的限制时结束。
有一个简单的解决方案:我使用 multiprocessing
的新上下文功能代替 pool = mp.Pool(n_jobs)
:
ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)
这保证了 Pool
进程是刚刚生成的,而不是从父进程派生出来的。因此,它们都无法访问原始 DataFrame
,并且它们都只需要父级内存的一小部分。
请注意,mp.get_context('spawn')
仅适用于 Python 3.4 及更新版本。
关于python - Pandas 和多处理内存管理 : Splitting a DataFrame into Multiple Chunks,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41240067/
我正在尝试设计我的输入:文件。以下 SO 问题让我完成了 95% 的任务。区别在于我使用的是 HTML5 multiple=multiple 属性。 How to style "input file"
我一直在进行一项实验,其中多个调查参与者使用可穿戴技术聆听多首音乐来跟踪多条信息,两个例子是 BPM(心率)和 T(体温)。 目标是衡量每首音乐(以用户反馈为特征)对人类情感的影响。 目前,所有数据都
我使用 jquery 添加/删除输入 我使用append为日期/收入添加多个Tr 我还使用另一个附加来添加多个 td 以获取同一日期 Tr 中的收入 我添加多个日期输入,并在此表中添加多个收入输入 我
在 Android 中,有一种方法可以为项目中的所有模块生成签名的 APK。例如。我有以下项目 Project -- Library Module -- Module 1 -- Modul
我有一个用于网站展示的系统。 展览数据可能来自差异表中的多个数据。 喜欢这个设计: Table [ExhibitionType] used for differentiate category. Ta
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 8 年前。 Improve
我正在使用 UILocalnotification...收到通知时,当应用程序处于事件模式时我打开 viewcontroller...但是如果同时收到多个通知...我如何打开多个 viewcontro
我遇到的问题是一个策略浏览器游戏,它有 7 种类型的值。问题如下: 我在 $_POST 中获得了 7 个不同的值,包括从索引 unit_1 到索引 unit_7。这 7 个值是 0 到 20 之间的整
这个问题已经有答案了: Search Large Text File for Thousands of strings (3 个回答) 已关闭10 年前。 我想在多个文件上“grep”多个正则表达式。
我经常对如何在我的应用程序中解决这个问题感到矛盾。我使用了很多选项,包括: 一个通用的多选 - 这是我最不喜欢和最很少使用的选项。我发现可用性非常糟糕,一个简单的误点击就会毁了你所有的辛勤工作。 “自
以下是 couchbase 中的示例文档之一。 { "name":"abc", "friends":["a","b","c"], "bestfriends":["x","y","z"] }
我有 4 张 table 。 表组 | ID | NAME | 1 Premium 2 Silver 表用户 | ID | group_id | NAME | 1
我正在开发一个使用第三方服务(Facebook、Google 等)对用户进行身份验证的应用程序。我为每个用户提供一个内部 ID(uuid v4),该 ID 与他们的第 3 方 ID 相关联。现在,我的
我是 bicep 新手,一直在努力实现 Bicep 脚本来部署具有许多主题和订阅的 Azure 服务总线。 我添加的每个主题都有可变数量的订阅(例如,通知主题可能有 3 个订阅,但分析主题可能有 2
我是 bicep 新手,一直在努力实现 Bicep 脚本来部署具有许多主题和订阅的 Azure 服务总线。 我添加的每个主题都有可变数量的订阅(例如,通知主题可能有 3 个订阅,但分析主题可能有 2
我必须创建一个大型数据库。它将保存来自 100 多个设备的数据,并不断更新数据库。每 10 秒,每个设备都会更新数据库中的一行。是为每个设备数据建立一个单独的表还是将数据与设备 ID 放在同一个表中更
我需要在 Activity 开始时显示“正在加载”进度对话框,然后在加载完成后显示一些内容。在我的 onresume 中,我有类似这样的代码: loadThread = true; Thread sh
我有一个 html 表单 当我提交表单时,假设对于 id = 1,数量为 5 或 对于 id = 3,数量为 8。如何在java脚本或jquery中获取这些值并将这些信息提交到服务器?我
我正在创建一个 Mozilla 扩展程序,通过单击“转换按钮”(标签:转换)将网页内容转换为其他语言它的标签被转换为英文,以便单击该按钮(标签:英文)内容被转换为原始形式 我尝试为每个选项卡设置属性“
我正在尝试根据 进行搜索 我通过运行代码从 select 中获取值: for($i=0;$i= '$age_from' AND users.user_age = '$age_from' AND u
我是一名优秀的程序员,十分优秀!