gpt4 book ai didi

iterator - 如何用迭代器包装 asyncio

转载 作者:行者123 更新时间:2023-12-01 00:13:58 28 4
gpt4 key购买 nike

我有以下简化代码:

async def asynchronous_function(*args, **kwds):
statement = await prepare(query)
async with conn.transaction():
async for record in statement.cursor():
??? yield record ???

...

class Foo:

def __iter__(self):
records = ??? asynchronous_function ???
yield from records

...

x = Foo()
for record in x:
...

我不知道怎么填 ???以上。我想产生记录数据,但是如何包装asyncio代码真的不是很明显。

最佳答案

虽然 asyncio 确实旨在全面使用,但有时根本不可能立即将大型软件(及其所有依赖项)转换为异步。幸运的是,有一些方法可以将遗留同步代码与新编写的 asyncio 部分结合起来。一个简单的方法是在专用线程中运行事件循环,并使用 asyncio.run_coroutine_threadsafe 向它提交任务。

使用这些低级工具,您可以编写通用适配器将任何异步迭代器转换为同步迭代器。例如:

import asyncio, threading, queue

# create an asyncio loop that runs in the background to
# serve our asyncio needs
loop = asyncio.get_event_loop()
threading.Thread(target=loop.run_forever, daemon=True).start()

def wrap_async_iter(ait):
"""Wrap an asynchronous iterator into a synchronous one"""
q = queue.Queue()
_END = object()

def yield_queue_items():
while True:
next_item = q.get()
if next_item is _END:
break
yield next_item
# After observing _END we know the aiter_to_queue coroutine has
# completed. Invoke result() for side effect - if an exception
# was raised by the async iterator, it will be propagated here.
async_result.result()

async def aiter_to_queue():
try:
async for item in ait:
q.put(item)
finally:
q.put(_END)

async_result = asyncio.run_coroutine_threadsafe(aiter_to_queue(), loop)
return yield_queue_items()

那么你的代码只需要调用 wrap_async_iter将异步迭代器包装成同步迭代器:

async def mock_records():
for i in range(3):
yield i
await asyncio.sleep(1)

for record in wrap_async_iter(mock_records()):
print(record)

在你的情况下 Foo.__iter__将使用 yield from wrap_async_iter(asynchronous_function(...)) .

关于iterator - 如何用迭代器包装 asyncio,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55132673/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com