gpt4 book ai didi

python - 简化字符串修改函数的嵌套asyncio操作

转载 作者:行者123 更新时间:2023-12-05 09:29:48 24 4
gpt4 key购买 nike

我有一个看起来像这样的异步代码:

  • 有一个第三方函数对字符串执行一些操作并返回修改后的字符串,对于这个问题的目的,它类似于non_async_func

  • 我有一个 async def async_func_single 函数,它环绕着执行单个操作的 non_async_func

  • 然后是另一个 async def async_func_batch 函数,它嵌套环绕 async_func_single 以对一批数据执行该函数。

代码有点工作,但我想知道更多关于为什么/如何,我的问题是

  • 是否有必要创建 async_func_single 并用 async_func_batch 环绕它?

  • 我可以直接在 async_func_batch 中输入一批数据来调用 non_async_func 吗?

  • 我有一个 per_chunk 函数,可以批量输入数据,是否有任何异步操作/函数可以避免使用预批处理我要发送到 async_func_batch?

import nest_asyncio
nest_asyncio.apply()

import asyncio
from itertools import zip_longest

from loremipsum import get_sentences

def per_chunk(iterable, n=1, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)

def non_async_func(text):
return text[::-1]

async def async_func_single(text):
# Perform some string operation.
return non_async_func(text)

async def async_func_batch(batch):
tasks = [async_func_single(text) for text in batch]
return await asyncio.gather(*tasks)

# Create some random inputs
thousand_texts = get_sentences(1000)

# Loop through 20 sentence at a time.
for batch in per_chunk(thousand_texts, n=20):
loop = asyncio.get_event_loop()
results = loop.run_until_complete(async_func_batch(batch))
for i, o in zip(thousand_texts, results):
print(i, o)

最佳答案

请注意,将您的函数标记为“async def”而不是“def”不会使它们自动异步 - 您可以拥有同步的“async def”函数。异步函数和同步函数之间的区别在于,异步函数定义了等待另一个异步函数或等待异步 IO 操作的位置(使用“await”)。

另请注意,asyncio 并不神奇 - 它基本上是一个调度程序,根据正在“等待”的功能/操作是否已完成来安排要运行的异步功能。而且,由于调度程序和异步函数都在单个线程上运行,因此在任何给定时刻,只能运行一个异步函数。

所以,回到你的代码,你的“async_func_single”函数唯一做的就是调用一个同步函数,因此,尽管被标记为“async def”,它仍然是一个同步函数。同样的逻辑适用于“async_func_batch”函数——传递给“asyncio.gather”的“async_func_single”任务都是同步的,所以“asyncio.gather”只是同步运行每个任务(所以它没有提供任何好处通过一个简单的 for 循环等待每个任务),所以“async_func_batch”又是一个同步函数。因为您只是调用同步函数,所以 asyncio 不会为您的程序带来任何好处。

如果您想要同时运行多个同步函数,则不要使用异步函数。您需要在并行进程/线程中运行它们:

import sys
import itertools
import concurrent.futures

from loremipsum import get_sentences

executor = concurrent.futures.ProcessPoolExecutor(workers=sys.cpu_count())

def per_chunk(iterable, n=1):
while True:
chunk = tuple(itertools.islice(iterable, n))
if chunk:
yield chunk
else:
break

def non_async_func(text):
return text[::-1]

def process_batches(batches):
futures = [
executor.submit(non_async_func, batch)
for batch in batches
]
concurrent.futures.wait(futures)

thousand_texts = get_sentences(1000)
process_batches(per_chunk(thousand_texts, n=20))

如果您仍想使用异步函数来处理批处理,则 asyncio 提供围绕并发 futures 的异步包装器:

async def process_batches(batches):
event_loop = asyncio.get_running_loop()
futures = [
event_loop.run_in_executor(executor, non_async_func, batch)
for batch in batches
]
await asyncio.wait(futures)

thousand_texts = get_sentences(1000)
asyncio.run(process_batches(per_chunk(thousand_texts, n=20)))

但它没有任何优势,除非您有其他可以在等待时运行的异步函数。

关于python - 简化字符串修改函数的嵌套asyncio操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70337497/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com