gpt4 book ai didi

python - Asyncio Pandas 与 Inplace

转载 作者:行者123 更新时间:2023-11-30 22:07:56 28 4
gpt4 key购买 nike

我只是read this introduction ,但在实现这两个示例时遇到问题(注释代码是第二个示例):

import asyncio
import pandas as pd
from openpyxl import load_workbook

async def loop_dfs(dfs):
async def clean_df(df):
df.drop(["column_1"], axis=1, inplace=True)
... a bunch of other inplace=True functions ...
return "Done"

# tasks = [clean_df(df) for (table, dfs) in dfs.items()]
# await asyncio.gather(*tasks)

tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)


def main():
dfs = {
sn: pd.read_excel("excel.xlsx", sheet_name=sn)
for sn in load_workbook("excel.xlsx").sheetnames
}

# loop = asyncio.get_event_loop()
# loop.run_until_complete(loop_dfs(dfs))

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(loop_dfs(dfs))
finally:
loop.close()

main()

我看到了一些关于 pandas 如何不支持 asyncio 的其他帖子,也许我只是错过了更大的图片,但如果我正在执行就地操作,那应该不重要,对吗? I saw recommendations for Dask但没有立即支持读取 Excel,我想我会先尝试这个,但我不断得到

RuntimeError: Event loop already running

最佳答案

I saw a few other posts about how pandas doesn't support asyncio, and maybe i'm just missing a bigger picture, but that shouldn't matter if i'm doing inplace operations right?

就地操作是那些 modify existing data 。这是一个效率问题,而您的目标似乎是并行化,这是一个完全不同的问题。

Pandas 不支持 asyncio,不仅因为它尚未实现,而且因为 Pandas 通常不执行 asyncio 支持良好的操作类型:网络和子进程 IO。 Pandas 函数要么使用 CPU,要么等待磁盘访问,这两者都不适合 asyncio。 Asyncio 允许使用看起来像普通同步代码的协程来表达网络通信。在协程内部,每个阻塞操作(例如网络读取)都会被等待,如果数据尚不可用,它会自动挂起整个任务。每次这样的暂停,系统都会切换到下一个任务,从而有效地创建一个协作多任务系统。

当尝试调用不支持 asyncio 的库(例如 pandas)时,表面上看起来一切正常,但您不会获得任何好处,并且代码将串行运行。例如:

async def loop_dfs(dfs):
async def clean_df(df):
...
tasks = [clean_df(df) for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)

由于 clean_df 不包含 await 的单个实例,因此它只是名义上的协程 - 它永远不会真正暂停其执行以允许其他协程运行。因此 await asyncio.wait(tasks) 将连续运行任务,就像您编写的那样:

for table, df in dfs.items():
clean_df(df)

为了让事情并行运行(假设 pandas 在其操作期间偶尔释放 GIL),您应该将各个 CPU 绑定(bind)函数移交给线程池:

async def loop_dfs(dfs):
def clean_df(df): # note: ordinary def
...
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(clean_df, df)
for (table, df) in dfs.items()]
completed, pending = await asyncio.wait(tasks)

如果您沿着这条路线走下去,那么您一开始就不需要 asyncio,只需使用 concurrent.futures 即可。例如:

def loop_dfs(dfs):  # note: ordinary def
def clean_df(df):
...
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(clean_df, df)
for (table, df) in dfs.items()]
concurrent.futures.wait(futures)

figured i'd try this first but I keep getting RuntimeError: Event loop already running

该错误通常意味着您已在已为您运行 asyncio 的环境(例如 jupyter 笔记本)中启动了脚本。如果是这种情况,请确保使用普通的 python 运行脚本,或者查阅笔记本的文档,了解如何更改代码以将协程提交到已运行的事件循环。

关于python - Asyncio Pandas 与 Inplace,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52340476/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com