gpt4 book ai didi

python - 使用 asyncio 协程并行运行函数?

转载 作者:行者123 更新时间:2023-12-04 15:07:05 25 4
gpt4 key购买 nike

我有以下代码从数据库( read_db )读取数据并将数据写入 Parquet 文件( data.to_parquet )。两种 I/O 操作都需要一段时间才能运行。

def main():
while id < 1000:
logging.info(f'reading - id: {id}')
data = read_db(id) # returns a dataframe

logging.info(f'saving - id: {id}')
data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')

id += 1

它很慢所以我想要 read_db(n+1)to_parquet(n)同时运行。我需要保留 id的每一步尽管按顺序完成( read_db(n+1) 需要在 read_db(n) 之后运行,而 data.to_parquet(n+1)data.to_parquet(n) 之后运行。)。这是异步版本
def async_wrap(f):
@wraps(f)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
p = partial(f, *args, **kwargs)
return await loop.run_in_executor(executor, p)
return run

async def main():
read_db_async = async_wrap(read_db)
while id < 1000:
logging.info(f'reading - id: {id}')
data = await read_db_async(id) # returns a dataframe

logging.info(f'saving - id: {id}')
to_parquet_async = async_wrap(data.to_parquet)
await data.to_parquet(f'{id}.parquet')
logging.info(f'saved - id: {id}')

id += 1

asyncio.get_event_loop().run_until_complete(main())
我除了看到一些乱序的日志:
reading - id: 1
saving - id: 1 (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....
但是,实际日志和同步代码是一样的吗?
reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....

最佳答案

您可以制作 read_db(n+1)to_parquet(n)使用 gather 同时运行或等效的:

async def main():
read_db_async = async_wrap(read_db)
prev_to_parquet = asyncio.sleep(0) # no-op

for id in range(1, 1000):
data, _ = await asyncio.gather(read_db_async(id), prev_to_parquet)
to_parquet_async = async_wrap(data.to_parquet)
prev_to_parquet = to_parquet_async(f'{id}.parquet')

await prev_to_parquet

关于python - 使用 asyncio 协程并行运行函数?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65911158/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com