gpt4 book ai didi

python-3.x - 如何在超时的情况下迭代异步迭代器?

转载 作者:行者123 更新时间:2023-12-04 18:01:13 25 4
gpt4 key购买 nike

我认为在代码方面更容易理解:

try:
async for item in timeout(something(), timeout=60):
await do_something_useful(item)
except asyncio.futures.TimeoutError:
await refresh()
我希望 async for最多运行60秒。

最佳答案

AsyncTimedIterable可以是您的代码中timeout()的实现:

class _AsyncTimedIterator:

__slots__ = ('_iterator', '_timeout', '_sentinel')

def __init__(self, iterable, timeout, sentinel):
self._iterator = iterable.__aiter__()
self._timeout = timeout
self._sentinel = sentinel

async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(), self._timeout)
except asyncio.TimeoutError:
return self._sentinel


class AsyncTimedIterable:

__slots__ = ('_factory', )

def __init__(self, iterable, timeout=None, sentinel=None):
self._factory = lambda: _AsyncTimedIterator(iterable, timeout, sentinel)

def __aiter__(self):
return self._factory()

(原始答案)

或使用此类替换您的 timeout()函数:
class AsyncTimedIterable:
def __init__(self, iterable, timeout=None, sentinel=None):
class AsyncTimedIterator:
def __init__(self):
self._iterator = iterable.__aiter__()

async def __anext__(self):
try:
return await asyncio.wait_for(self._iterator.__anext__(),
timeout)
except asyncio.TimeoutError:
return sentinel

self._factory = AsyncTimedIterator

def __aiter__(self):
return self._factory()

关于python-3.x - 如何在超时的情况下迭代异步迭代器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50241696/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com