gpt4 book ai didi

python - Tornado 发生器在列表中的任何 future 恢复

转载 作者:行者123 更新时间:2023-11-28 22:50:54 26 4
gpt4 key购买 nike

tornado(或 asyncio)中是否有行为/模式等待 any 而不是 list 中的所有 Future?

yield any_of([future1, future2, future3])

假设 future2 准备好了那么结果应该是:

[None, <result>, None]

最佳答案

更新: Tornado 现在有tornado.gen.WaitIterator ,根据其文档中的示例使用它,而不是下面我的想法。

您可以创建一个继承自 Future 的 Any 类,并包装一个 futures 列表。 Any 类等到它的一个 futures 解决,然后给你结果列表:

import time
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.concurrent import Future


@gen.coroutine
def delayed_msg(seconds, msg):
yield gen.Task(IOLoop.current().add_timeout,
time.time() + seconds)
raise gen.Return(msg)


class Any(Future):
def __init__(self, futures):
super(Any, self).__init__()
self.futures = futures
for future in futures:
future.add_done_callback(self.done_callback)

def done_callback(self, future):
try:
self.set_result(self.make_result())
except Exception as e:
self.set_exception(e)

def make_result(self):
"""A list of results: None for each pending future, a result for
each resolved future. Raises an exception for the first future
that has an exception.
"""
return [f.result() if f.done() else None
for f in self.futures]

def clear(self):
"""Break reference cycle with any pending futures."""
self.futures = None


@gen.coroutine
def f():
start = time.time()
future1 = delayed_msg(2, '2')
future2 = delayed_msg(3, '3')
future3 = delayed_msg(1, '1')
results = yield Any([future1, future2, future3])
end = time.time()
print "finished in %.1f sec: %r" % (end - start, results)

results = yield Any([future1, future2])
end = time.time()
print "finished in %.1f sec: %r" % (end - start, results)

IOLoop.current().run_sync(f)

如预期的那样,打印出:

finished in 1.0 sec: [None, None, '1']
finished in 2.0 sec: ['2', None]

但是您可以看到这种方法存在一些复杂性。一方面,如果您想在第一个 future 解决后等待 其余 future ,构建仍未决 future 的列表会很复杂。我想你可以这样做:

results = yield Any(f for f in [future1, future2, future3] if not f.done())

不漂亮,甚至不正确!存在竞争条件。如果 yield Any(...) 的连续执行之间解决了 future ,那么您将永远不会收到它的结果。第一个 yield 没有得到 future 的结果,因为它仍在等待中,但是第二个 yield 也没有得到它的结果,因为到那时 future 是“完成”,并且它不包含在传递给 Any 的列表中。

另一个复杂的问题是 Any 指的是每个 future ,它指的是回调,回调又指回 Any。为了及时进行垃圾回收,您应该调用 Any.clear()。

此外,您无法区分未决的 future 和解决为 None 的 future 。您需要一个不同于 None 的特殊标记值来表示未决的 future 。

最后的并发症是最严重的。如果解决了多个 future 并且其中一些有异常,则 Any 没有明显的方式将所有这些信息传达给您。将异常和结果混合在一个列表中是有悖常理的。

我认为有一种更简单的方法。我们可以让 Any 只返回第一个解决的 future ,而不是结果列表:

class Any(Future):
def __init__(self, futures):
super(Any, self).__init__()
for future in futures:
future.add_done_callback(self.done_callback)

def done_callback(self, future):
self.set_result(future)

引用循环消失了,异常处理问题得到了回答:Any 类将整个 future 返回给您,而不是它的结果或异常。您可以根据需要检查它。在解决了一些问题之后等待剩余的 future 也很容易:

@gen.coroutine
def f():
start = time.time()
future1 = delayed_msg(2, '2')
future2 = delayed_msg(3, '3')
future3 = delayed_msg(1, '1')

futures = set([future1, future2, future3])
while futures:
resolved = yield Any(futures)
end = time.time()
print "finished in %.1f sec: %r" % (end - start, resolved.result())
futures.remove(resolved)

根据需要,打印:

finished in 1.0 sec: '1'
finished in 2.0 sec: '2'
finished in 3.0 sec: '3'

我们可以通过添加一个新的全局函数来测试异常处理行为:

@gen.coroutine
def delayed_exc(seconds, msg):
yield gen.Task(IOLoop.current().add_timeout,
time.time() + seconds)
raise Exception(msg)

并生成它而不是 delayed_msg:

@gen.coroutine
def f():
start = time.time()
future1 = delayed_msg(2, '2')
future2 = delayed_exc(3, '3') # Exception!
future3 = delayed_msg(1, '1')

futures = set([future1, future2, future3])
while futures:
resolved = yield Any(futures)
end = time.time()
try:
outcome = resolved.result()
except Exception as e:
outcome = e

print "finished in %.1f sec: %r" % (end - start, outcome)
futures.remove(resolved)

现在,脚本将打印“1”,然后是“2”,然后是“Exception('3',)”。

关于python - Tornado 发生器在列表中的任何 future 恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22269474/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com