gpt4 book ai didi

How to execute two "aggregate" functions (like sum) concurrently, feeding them from the same iterator?(如何同时执行两个“聚合”函数(如SUM),从同一个迭代器提供它们?)

转载 作者:bug小助手 更新时间:2023-10-25 19:48:56 32 4
gpt4 key购买 nike



Imagine we have an iterator, say iter(range(1, 1000)). And we have two functions, each accepting an iterator as the only parameter, say sum() and max(). In SQL world we would call them aggregate functions.

假设我们有一个迭代器,比如ITER(Range(1,1000))。我们有两个函数,每个函数都接受迭代器作为唯一的参数,比如sum()和max()。在SQL世界中,我们将它们称为聚合函数。



Is there any way to obtain results of both without buffering the iterator output?

有没有办法在不缓冲迭代器输出的情况下获得两者的结果?



To do it, we would need to pause and resume aggregate function execution, in order to feed them both with the same values without storing them. Maybe is there a way to express it using async things without sleeps?

为此,我们需要暂停和恢复聚合函数执行,以便向它们提供相同的值,而不存储它们。也许有一种方法可以用不睡觉的异步物来表达它?


更多回答

sum(a,b,c) is the same as sum(a,sum(b,c)), likewise for max. Can we assume that that's always the case? Then just apply the aggregator functions for each element in the iterator.

Sum(a,b,c)与sum(a,sum(b,c))相同,max也是如此。我们能假设情况一直都是这样吗?然后,只需为迭代器中的每个元素应用聚合器函数。

@tobias_k Nice catch! I can't speak for the OP, but assuming that does sound like a bit of a stretch because then you're really working with binary functions (+ and the binary max), and not with aggregate functions. The question refers to aggregate functions in general, describing them as "accepting an iterator as the only parameter", only using sum and max as examples. In that context I would argue that an answer needs to work for aggregates that cannot be reduced to a stateless series of applications of a binary function (e.g. an aggregate that returns the median of the sequence).

@tobias_k接得好!我不能为OP说话,但假设这听起来有点牵强,因为你实际上是在使用二进制函数(+和二进制max),而不是聚合函数。这个问题一般涉及聚合函数,将它们描述为“接受迭代器作为唯一参数”,仅使用sum和max作为示例。在这种情况下,我认为答案需要适用于无法简化为二元函数的无状态应用程序系列的聚合(例如,返回序列中值的聚合)。

@user4815162342 I thought it would be a nice and simple way that works with O(1) memory and without threads, but you are right; average would be another example. (Also, it's pretty slow.)

@user4815162342我认为这将是一种使用O(1)内存且不使用线程的很好且简单的方法,但您是对的;Average将是另一个例子。(此外,它的运行速度也相当慢。)

@tobias_k Also, it's pretty slow. I tried it out of curiosity, and for sum and max and range(10000) it clocks at 4.9 ms on my machine, way faster than solutions from my answer (except the initial ones that buffer everything).

@Tobias_k还有,它相当慢。出于好奇,我试了试,对于sum、max和range(10000),它在我的机器上的运行速度是4.9ms,比我答案中的答案快得多(除了最初的缓冲一切的答案)。

@user4815162342 I just compared it to buffering the entire iterator with list or tee. Anyway, I posted it as an answer, maybe it's useful in some cases. At least it's simpler than the thread-based approaches.

@user4815162342我只是把它比作用list或tee缓冲整个迭代器。不管怎样,我把它作为一个答案发布了,也许在某些情况下它是有用的。至少它比基于线程的方法简单。

优秀答案推荐

Let's consider how to apply two aggregate functions to the same iterator, which we can only exhaust once. The initial attempt (which hardcodes sum and max for brevity, but is trivially generalizable to an arbitrary number of aggregate functions) might look like this:

让我们考虑如何将两个聚合函数应用于同一个迭代器,我们只能穷尽一次。最初的尝试(为简洁起见对sum和max进行硬编码,但可以简单地概括为任意数量的聚合函数)可能如下所示:



def max_and_sum_buffer(it):
content = list(it)
p = sum(content)
m = max(content)
return p, m


This implementation has the downside that it stores all the generated elements in memory at once, despite both functions being perfectly capable of stream processing. The question anticipates this cop-out and explicitly requests the result to be produced without buffering the iterator output. Is it possible to do this?

这种实现的缺点是,它一次将所有生成的元素存储在内存中,尽管这两个函数都能够很好地进行流处理。问题预料到了这种回避,并显式地请求在不缓冲迭代器输出的情况下生成结果。能做到这一点吗?



Serial execution: itertools.tee



It certainly seems possible. After all, Python iterators are external, so every iterator is already capable of suspending itself. How hard can it be to provide an adapter that splits an iterator into two new iterators that provide the same content? Indeed, this is exactly the description of itertools.tee, which appears perfectly suited to parallel iteration:

这看起来当然是有可能的。毕竟,Python迭代器是外部的,所以每个迭代器都已经能够挂起自己了。提供将迭代器拆分成两个提供相同内容的新迭代器的适配器有多难?实际上,这就是itertools.tee的描述,它看起来非常适合并行迭代:



def max_and_sum_tee(it):
it1, it2 = itertools.tee(it)
p = sum(it1) # XXX
m = max(it2)
return p, m


The above produces the correct result, but doesn't work the way we'd like it to. The trouble is that we're not iterating in parallel. Aggregate functions like sum and max never suspend - each insists on consuming all of the iterator content before producing the result. So sum will exhaust it1 before max has had a chance to run at all. Exhausting elements of it1 while leaving it2 alone will cause those elements to be accumulated inside an internal FIFO shared between the two iterators. That's unavoidable here - since max(it2) must see the same elements, tee has no choice but to accumulate them. (For more interesting details on tee, refer to this post.)

上面产生了正确的结果,但不是以我们希望的方式工作。问题是我们没有并行迭代。像sum和max这样的聚合函数从不挂起--每个函数都坚持在产生结果之前使用所有迭代器内容。因此,在max根本没有机会运行之前,sum就会耗尽IT1。耗尽IT1的元素,而不使用IT2,将导致这些元素在两个迭代器共享的内部FIFO中累积。这在这里是不可避免的--因为max(It2)必须看到相同的元素,所以tee别无选择,只能累积这些元素。(有关TEE的更多有趣细节,请参阅此帖子。)



In other words, there is no difference between this implementation and the first one, except that the first one at least makes the buffering explicit. To eliminate buffering, sum and max must run in parallel, not one after the other.

换句话说,这个实现和第一个实现之间没有区别,只是第一个实现至少显式地显示了缓冲。要消除缓冲,sum和max必须并行运行,而不是一个接一个。



Threads: concurrent.futures



Let's see what happens if we run the aggregate functions in separate threads, still using tee to duplicate the original iterator:

让我们来看看,如果我们在单独的线程中运行聚合函数,并且仍然使用tee来复制原始迭代器,会发生什么:



def max_and_sum_threads_simple(it):
it1, it2 = itertools.tee(it)

with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(it1))
max_future = executor.submit(lambda: max(it2))

return sum_future.result(), max_future.result()


Now sum and max actually run in parallel (as much as the GIL permits), threads being managed by the excellent concurrent.futures module. It has a fatal flaw, however: for tee not to buffer the data, sum and max must process their items at exactly the same rate. If one is even a little bit faster than the other, they will drift apart, and tee will buffer all intermediate elements. Since there is no way to predict how fast each will run, the amount of buffering is both unpredictable and has the nasty worst case of buffering everything.

现在,sum和max实际上是并行运行的(只要GIL允许),线程由出色的concurent.Future模块管理。然而,它有一个致命的缺陷:对于不缓冲数据的tee,sum和max必须以完全相同的速率处理它们的项。如果其中一个比另一个快一点,它们就会分开,而TEE将缓冲所有中间元素。由于无法预测每个线程的运行速度,因此缓冲区的大小是不可预测的,而且最糟糕的情况就是缓存所有内容。



To ensure that no buffering occurs, tee must be replaced with a custom generator that buffers nothing and blocks until all the consumers have observed the previous value before proceeding to the next one. As before, each consumer runs in its own thread, but now the calling thread is busy running a producer, a loop that actually iterates over the source iterator and signals that a new value is available. Here is an implementation:

为了确保不发生缓冲,必须用定制生成器替换TEE,该生成器不缓冲任何内容,并阻塞,直到所有使用者都观察到前一个值,然后再继续下一个值。和以前一样,每个使用者都在自己的线程中运行,但现在调用线程正忙于运行生产者,这是一个循环,它实际上迭代源迭代器,并发出新值可用的信号。以下是一个实现:



def max_and_sum_threads(it):
STOP = object()
next_val = None
consumed = threading.Barrier(2 + 1) # 2 consumers + 1 producer
val_id = 0
got_val = threading.Condition()

def send(val):
nonlocal next_val, val_id
consumed.wait()
with got_val:
next_val = val
val_id += 1
got_val.notify_all()

def produce():
for elem in it:
send(elem)
send(STOP)

def consume():
last_val_id = -1
while True:
consumed.wait()
with got_val:
got_val.wait_for(lambda: val_id != last_val_id)
if next_val is STOP:
return
yield next_val
last_val_id = val_id

with concurrent.futures.ThreadPoolExecutor(2) as executor:
sum_future = executor.submit(lambda: sum(consume()))
max_future = executor.submit(lambda: max(consume()))
produce()

return sum_future.result(), max_future.result()


This is quite some amount of code for something so simple conceptually, but it is necessary for correct operation.

对于概念上如此简单的东西来说,这是相当多的代码,但它是正确操作所必需的。



produce() loops over the outside iterator and sends the items to the consumers, one value at a time. It uses a barrier, a convenient synchronization primitive added in Python 3.2, to wait until all consumers are done with the old value before overwriting it with the new one in next_val. Once the new value is actually ready, a condition is broadcast. consume() is a generator that transmits the produced values as they arrive, until detecting STOP. The code can be generalized run any number of aggregate functions in parallel by creating consumers in a loop, and adjusting their number when creating the barrier.

Product()遍历外部迭代器并将项发送给消费者,一次一个值。它使用一个屏障,即在Python3.2中添加的一个方便的同步原语,在用NEXT_VAL中的新值覆盖它之前,等待所有使用者使用完旧值。一旦新值实际准备就绪,就会广播一个条件。Consumer()是一个生成器,它在生成值到达时传输它们,直到检测到停止为止。通过在循环中创建使用者,并在创建障碍时调整它们的数量,代码可以泛化为并行运行任意数量的聚合函数。



The downside of this implementation is that it requires creation of threads (possibly alleviated by making the thread pool global) and a lot of very careful synchronization at each iteration pass. This synchronization destroys performance - this version is almost 2000 times slower than the single-threaded tee, and 475 times slower than the simple but non-deterministic threaded version.

这种实现的缺点是,它需要创建线程(可能通过将线程池设置为全局来缓解),并在每次迭代过程中进行大量非常仔细的同步。这种同步会破坏性能--这个版本几乎比单线程TEE慢2000倍,比简单但不确定的线程版本慢475倍。



Still, as long as threads are used, there is no avoiding synchronization in some form. To completely eliminate synchronization, we must abandon threads and switch to cooperative multi-tasking. The question is is it possible to suspend execution of ordinary synchronous functions like sum and max in order to switch between them?

尽管如此,只要使用线程,就无法避免某种形式的同步。为了完全消除同步,我们必须放弃线程,转而使用协作多任务处理。问题是,是否可以暂停执行像SUM和MAX这样的普通同步函数,以便在它们之间切换?



Fibers: greenlet



It turns out that the greenlet third-party extension module enables exactly that. Greenlets are an implementation of fibers, lightweight micro-threads that switch between each other explicitly. This is sort of like Python generators, which use yield to suspend, except greenlets offer a much more flexible suspension mechanism, allowing one to choose who to suspend to.

事实证明,Greenlet第三方扩展模块正是实现了这一点。Greenlet是纤程的实现,纤程是在彼此之间显式切换的轻量级微线程。这有点像Python生成器,它使用Year来挂起,不同的是,Greenlet提供了一种更灵活的挂起机制,允许选择挂起到谁。



This makes it fairly easy to port the threaded version of max_and_sum to greenlets:

这使得将max_and_sum的线程版本移植到greenlet非常容易:



def max_and_sum_greenlet(it):
STOP = object()
consumers = None

def send(val):
for g in consumers:
g.switch(val)

def produce():
for elem in it:
send(elem)
send(STOP)

def consume():
g_produce = greenlet.getcurrent().parent
while True:
val = g_produce.switch()
if val is STOP:
return
yield val

sum_result = []
max_result = []
gsum = greenlet.greenlet(lambda: sum_result.append(sum(consume())))
gsum.switch()
gmax = greenlet.greenlet(lambda: max_result.append(max(consume())))
gmax.switch()
consumers = (gsum, gmax)
produce()

return sum_result[0], max_result[0]


The logic is the same, but with less code. As before, produce produces values retrieved from the source iterator, but its send doesn't bother with synchronization, as it doesn't need to when everything is single-threaded. Instead, it explicitly switches to every consumer in turn to do its thing, with the consumer dutifully switching right back. After going through all consumers, the producer is ready for the next iteration pass.

逻辑是相同的,但代码更少。和以前一样,Product生成从源迭代器检索到的值,但它的Send并不需要同步,因为当一切都是单线程时,它不需要同步。相反,它明确地切换到每个消费者来做它的事情,消费者尽职尽责地切换回来。在检查完所有使用者之后,生产者就可以进行下一次迭代了。



Results are retrieved using an intermediate single-element list because greenlet doesn't provide access to the return value of the target function (and neither does threading.Thread, which is why we opted for concurrent.futures above).

使用中间的单元素列表来检索结果,因为greenlet不提供对目标函数的返回值的访问(threading.Thread也不提供,这就是为什么我们选择上面的conCurent.Futures)。



There are downsides to using greenlets, though. First, they don't come with the standard library, you need to install the greenlet extension. Then, greenlet is inherently non-portable because the stack-switching code is not supported by the OS and the compiler and can be considered somewhat of a hack (although an extremely clever one). A Python targeting WebAssembly or JVM or GraalVM would be very unlikely to support greenlet. This is not a pressing issue, but it's definitely something to keep in mind for the long haul.

不过,使用绿色纸币也有不利之处。首先,它们不随标准库一起提供,您需要安装Greenlet扩展。然后,greenlet本质上是不可移植的,因为堆栈切换代码不受操作系统和编译器的支持,可以被认为是某种程度上的黑客攻击(尽管这是一个非常聪明的攻击)。以WebAssembly、JVM或GraalVM为目标的Python不太可能支持greenlet。这不是一个紧迫的问题,但绝对是一个长期需要牢记的问题。



Coroutines: asyncio



As of Python 3.5, Python provides native coroutines. Unlike greenlets, and similar to generators, coroutines are distinct from regular functions and must be defined using async def. Coroutines can't be easily executed from synchronous code, they must instead be processed by a scheduler which drives them to completion. The scheduler is also known as an event loop because its other job is to receive IO events and pass them to appropriate callbacks and coroutines. In the standard library, this is the role of the asyncio module.

从Python3.5开始,Python提供了本机协程。与greenlet不同,与生成器类似,协程不同于常规函数,必须使用异步定义来定义。从同步代码执行协程并非易事,它们必须由驱动它们完成的调度器来处理。调度程序也称为事件循环,因为它的另一项工作是接收IO事件并将它们传递给适当的回调和协程。在标准库中,这是asyncio模块的角色。



Before implementing an asyncio-based max_and_sum, we must first resolve a hurdle. Unlike greenlet, asyncio is only able to suspend execution of coroutines, not of arbitrary functions. So we need to replace sum and max with coroutines that do essentially the same thing. This is as simple as implementing them in the obvious way, only replacing for with async for, enabling the async iterator to suspend the coroutine while waiting for the next value to arrive:

在实现基于异步的max_and_sum之前,我们必须首先解决一个障碍。与greenlet不同,asyncio只能暂停执行协程,而不能执行任意函数。所以我们需要用本质上做同样事情的协程来代替sum和max。这很简单,只需以显而易见的方式实现它们,只需将for替换为async for,即可使异步迭代器在等待下一个值到达时挂起协程:



async def asum(it):
s = 0
async for elem in it:
s += elem
return s

async def amax(it):
NONE_YET = object()
largest = NONE_YET
async for elem in it:
if largest is NONE_YET or elem > largest:
largest = elem
if largest is NONE_YET:
raise ValueError("amax() arg is an empty sequence")
return largest

# or, using https://github.com/vxgmichel/aiostream
#
#from aiostream.stream import accumulate
#def asum(it):
# return accumulate(it, initializer=0)
#def amax(it):
# return accumulate(it, max)


One could reasonably ask if providing a new pair of aggregate functions is cheating; after all, the previous solutions were careful to use existing sum and max built-ins. The answer will depend on the exact interpretation of the question, but I would argue that the new functions are allowed because they are in no way specific to the task at hand. They do the exact same thing the built-ins do, but consuming async iterators. I suspect that the only reason such functions don't already exist somewhere in the standard library is due to coroutines and async iterators being a relatively new feature.

人们可能会合理地问,提供一对新的聚合函数是否是作弊;毕竟,以前的解决方案小心地使用了现有的sum和max内置。答案将取决于对问题的准确解释,但我要争辩说,允许新功能是因为它们绝不是手头的任务所特有的。它们做与内置程序完全相同的事情,只是使用异步迭代器。我怀疑这些函数不存在于标准库中的唯一原因是协程和异步迭代器是一个相对较新的特性。



With that out of the way, we can proceed to write max_and_sum as a coroutine:

这样,我们就可以继续将max_and_sum写为协程:



async def max_and_sum_asyncio(it):
loop = asyncio.get_event_loop()
STOP = object()

next_val = loop.create_future()
consumed = loop.create_future()
used_cnt = 2 # number of consumers

async def produce():
for elem in it:
next_val.set_result(elem)
await consumed
next_val.set_result(STOP)

async def consume():
nonlocal next_val, consumed, used_cnt
while True:
val = await next_val
if val is STOP:
return
yield val
used_cnt -= 1
if not used_cnt:
consumed.set_result(None)
consumed = loop.create_future()
next_val = loop.create_future()
used_cnt = 2
else:
await consumed

s, m, _ = await asyncio.gather(asum(consume()), amax(consume()),
produce())
return s, m


Although this version is based on switching between coroutines inside a single thread, just like the one using greenlet, it looks different. asyncio doesn't provide explicit switching of coroutines, it bases task switching on the await suspension/resumption primitive. The target of await can be another coroutine, but also an abstract "future", a value placeholder which will be filled in later by some other coroutine. Once the awaited value becomes available, the event loop automatically resumes execution of the coroutine, with the await expression evaluating to the provided value. So instead of produce switching to consumers, it suspends itself by awaiting a future that will arrive once all the consumers have observed the produced value.

虽然这个版本是基于在单个线程内的协程之间切换的,就像使用greenlet的版本一样,但它看起来不同。Asyncio不提供显式的协程切换,它基于等待挂起/恢复原语进行任务切换。等待的目标可以是另一个协程,但也可以是一个抽象的“未来”,一个值占位符,稍后将由其他协程填充。一旦等待的值可用,事件循环将自动恢复协程的执行,等待表达式的计算结果为提供的值。因此,它没有把产品转移给消费者,而是暂停自己,等待一个未来的到来,一旦所有消费者都观察到了创造的价值,未来就会到来。



consume() is an asynchronous generator, which is like an ordinary generator, except it creates an async iterator, which our aggregate coroutines are already prepared to accept by using async for. An async iterator's equivalent of __next__ is called __anext__ and is a coroutine, allowing the coroutine that exhausts the async iterator to suspend while waiting for the new value to arrive. When a running async generator suspends on an await, that is observed by async for as a suspension of the implicit __anext__ invocation. consume() does exactly that when it waits for the values provided by produce and, as they become available, transmits them to aggregate coroutines like asum and amax. Waiting is realized using the next_val future, which carries the next element from it. Awaiting that future inside consume() suspends the async generator, and with it the aggregate coroutine.

Consumer()是一个异步生成器,它与普通生成器类似,不同之处在于它创建了一个异步迭代器,我们的聚合协程已经准备好使用Async for接受它。异步迭代器的等价物__NEXT__称为__anext__,它是一个协程,允许耗尽异步迭代器的协程在等待新值到达时挂起。当正在运行的异步生成器在等待时挂起时,异步观察到这是隐式__ANEXT__调用的挂起。Consumer()在等待Products提供的值时确实会这样做,当这些值可用时,它会将它们传输到Asum和Amax这样的聚合协程。等待是使用NEXT_VAL未来值实现的,它携带其中的下一个元素。在等待未来的时候,Consumer()会挂起异步生成器以及聚合协例程。



The advantage of this approach compared to greenlets' explicit switching is that it makes it much easier to combine coroutines that don't know of each other into the same event loop. For example, one could have two instances of max_and_sum running in parallel (in the same thread), or run a more complex aggregate function that invoked further async code to do calculations.

与greenlet的显式切换相比,这种方法的优势在于可以更容易地将互不相识的协程合并到同一个事件循环中。例如,可以(在同一线程中)并行运行max_and_sum的两个实例,或者运行一个更复杂的聚合函数,该函数调用更多的异步代码来进行计算。



The following convenience function shows how to run the above from non-asyncio code:

以下便利函数显示了如何从非异步代码运行上述内容:



def max_and_sum_asyncio_sync(it):
# trivially instantiate the coroutine and execute it in the
# default event loop
coro = max_and_sum_asyncio(it)
return asyncio.get_event_loop().run_until_complete(coro)


Performance



Measuring and comparing performance of these approaches to parallel execution can be misleading because sum and max do almost no processing, which over-stresses the overhead of parallelization. Treat these as you would treat any microbenchmarks, with a large grain of salt. Having said that, let's look at the numbers anyway!

测量和比较这些并行执行方法的性能可能会产生误导,因为sum和max几乎不进行任何处理,这过度强调了并行化的开销。对待这些基准就像对待任何微基准一样,但要有所保留。话虽如此,不管怎样,让我们来看看这些数字!



Measurements were produced using Python 3.6 The functions were run only once and given range(10000), their time measured by subtracting time.time() before and after the execution. Here are the results:

测量是使用Python3.6生成的。函数只运行一次,给定范围(10000),其时间通过在执行前后减去time.time()来测量。以下是结果:




  • max_and_sum_buffer and max_and_sum_tee: 0.66 ms - almost exact same time for both, with the tee version being a bit faster.

    Max_and_sum_Buffer和max_and_sum_tee:0.66毫秒-两者的时间几乎完全相同,TEE版本的速度更快一些。


  • max_and_sum_threads_simple: 2.7 ms. This timing means very little because of non-deterministic buffering, so this might be measuring the time to start two threads and the synchronization internally performed by Python.

    MAX_AND_SUM_THREADS_SIMPLE:2.7毫秒。由于不确定的缓冲,这个计时意味着很少,所以这可能是测量启动两个线程的时间和由Python在内部执行的同步。


  • max_and_sum_threads: 1.29 seconds, by far the slowest option, ~2000 times slower than the fastest one. This horrible result is likely caused by a combination of the multiple synchronizations performed at each step of the iteration and their interaction with the GIL.

    MAX_AND_SUM_THREADS:1.29秒,这是迄今为止最慢的选项,比最快的选项慢约2000倍。这一可怕的结果很可能是在迭代的每个步骤执行的多个同步以及它们与GIL的交互的组合造成的。


  • max_and_sum_greenlet: 25.5 ms, slow compared to the initial version, but much faster than the threaded version. With a sufficiently complex aggregate function, one can imagine using this version in production.

    Max_and_sum_greenlet:25.5毫秒,比初始版本慢,但比线程版本快得多。有了一个足够复杂的聚合函数,可以想象在生产中使用这个版本。


  • max_and_sum_asyncio: 351 ms, almost 14 times slower than the greenlet version. This is a disappointing result because asyncio coroutines are more lightweight than greenlets, and switching between them should be much faster than switching between fibers. It is likely that the overhead of running the coroutine scheduler and the event loop (which in this case is overkill given that the code does no IO) is destroying the performance on this micro-benchmark.

    max_and_sum_blog:351 ms,比greenlet版本慢了近14倍。这是一个令人失望的结果,因为Deluxio协程比Greenlet更轻量级,它们之间的切换应该比光纤之间的切换快得多。运行协程调度程序和事件循环的开销(在这种情况下,由于代码不进行IO,因此这是多余的)可能会破坏这个微基准测试的性能。


  • max_and_sum_asyncio using uvloop: 125 ms. This is more than twice the speed of regular asyncio, but still almost 5x slower than greenlet.

    使用uvloop的max_and_sum_asyncio:125毫秒。这是普通异步机的两倍多的速度,但仍然比绿灯慢近5倍。




Running the examples under PyPy doesn't bring significant speedup, in fact most of the examples run slightly slower, even after running them several times to ensure JIT warmup. The asyncio function requires a rewrite not to use async generators (since PyPy as of this writing implements Python 3.5), and executes in somewhat under 100ms. This is comparable to CPython+uvloop performance, i.e. better, but not dramatic compared to greenlet.

在PyPy下运行示例并不会带来显著的加速,事实上,大多数示例的运行速度都会稍慢一些,即使在多次运行它们以确保JIT预热之后也是如此。Asyncio函数需要重写才能不使用异步生成器(因为在撰写本文时,PyPy实现了Python3.5),并且执行时间略低于100ms。这与CPython+uvloop的性能相当,即比greenlet更好,但不是很明显。



If it holds for your aggregate functions that f(a,b,c,...) == f(a, f(b, f(c, ...))),then you could just cycle through your functions and feed them one element at a time, each time combining them with the result of the previous application, like reduce would do, e.g. like this:

如果对于聚合函数f(a,b,c,...)==f(a,f(b,f(c,...)成立,那么您只需循环您的函数,一次向它们提供一个元素,每次将它们与上一个应用程序的结果组合在一起,就像Reduced所做的那样,例如:



def aggregate(iterator, *functions):
first = next(iterator)
result = [first] * len(functions)
for item in iterator:
for i, f in enumerate(functions):
result[i] = f((result[i], item))
return result


This is considerably slower (about 10-20 times) than just materializing the iterator in a list and applying the aggregate function on the list as a whole, or using itertools.tee (which basically does the same thing, internally), but it has the benefit of using no additional memory.

这比仅仅在列表中物化迭代器并将聚合函数应用于整个列表,或使用itertools.tee(在内部执行基本相同的操作)要慢得多(大约10-20倍),但它的好处是不使用额外的内存。



Note, however, that while this works well for functions like sum, min or max, it does not work for other aggregating functions, e.g. finding the mean or median element of an iterator, as mean(a, b, c) != mean(a, mean(b, c)). (For mean, you could of course just get the sum and divide it by the number of elements, but computing e.g. the median taking just one element at a time will be more challenging.)

然而,请注意,尽管这对于像sum、min或max这样的函数很有效,但它不适用于其他聚合函数,例如查找迭代器的平均值或中值元素,如Mean(a,b,c)!=Mean(a,Mean(b,c))。(对于均值,你当然可以只得到总和,然后除以元素的数量,但计算例如一次只取一个元素的中位数将更具挑战性。)



I was trying to solve the same problem and understand asyncio a little better and I came up with an answer that I believe is the same as user4815162342, but uses an asyncio.queue per consumer instead of creating two futures per iteration. Of course, it still has the downside of needing to re-write all your aggregation functions as async def coroutines.

我试图解决同样的问题,并更好地理解asyncio,我得出了一个答案,我认为它与user4815162342相同,但对每个使用者使用一个asyncio.Queue,而不是在每次迭代中创建两个未来。当然,它仍然有一个缺点,即需要将所有聚合函数重写为异步def协程。


import asyncio


# Provided an iterable and an integer, return N iterables that will each yield
# the values of the input iterable. All iterables must be iterated
# simultaneously.
def tee_async(it, N):
# larger queues would provide less context switching, but use more memory.
qs = [asyncio.Queue(1) for _ in range(N)]
# value to send to the queue to signal end of iteration
STOP = object()

# we have a sync iterator, we need an async iterator
# use a queue to pass values from the sync iterator to here
# implement an async iterator here to pass the values to the consumer coroutine
# bug warning: this hangs if one of the consumers does not finish iterating
async def QueueIterator(q: asyncio.Queue):
while True:
i = await q.get()
print(f"got {i}")
if i is STOP:
break
yield i

# A coroutine to consume the input iterator
async def IteratorConsumer():
# note: the iterator might advance to the next value before the
# consumers have started with the previous value. IE the first aggregator
# will work on it[0] AFTER we have loaded it[1] here.
for i in it:
for j, q in enumerate(qs):
print(f"putting {i} in {j}")
await q.put(i)
for j, q in enumerate(qs):
print(f"putting END_ITER in {j}")
await q.put(STOP)

asyncio.create_task(IteratorConsumer())
return [QueueIterator(q) for q in qs]


# Provided an iterable and a list of aggregation coroutines, send each iterated
# value to each aggregation coroutine and return a list of the return values.
async def aggregate(it, aggregators):
tees = tee_async(it, len(aggregators))
tasks = []

# if using recent python, try: async with asyncio.TaskGroup() as tg:
tasks = [asyncio.create_task(a(t)) for a, t in zip(aggregators, tees)]

return [await t for t in tasks]


# Provided an iterable and a list of aggregation coroutines, send each iterated
# value to each aggregation coroutine and return a list of the return values.
def aggregate_without_storing_everything(it, aggregators):
return asyncio.run(aggregate(it, aggregators))


async def amax(it):
max = None
async for i in it:
if max is None or i > max:
max = i
return max


async def acount(it):
c = 0
async for i in it:
c += 1
return c


def main():
print(aggregate_without_storing_everything(range(3), [amax, acount]))


if __name__ == "__main__":
main()

I benchmarked this against the max_and_sum_asyncio_sync function, the queue based approach here is significantly slower until the queue size is greater than 5. However both are incredibly slow, the point of doing this is that the iteration would not fit in memory, but in my tests an iteration that would not fit in memory would take days to run. This is not a good approach in practice.

我根据max_and_sum_succio_sync函数对此进行了基准测试,这里基于队列的方法明显较慢,直到队列大小大于5。然而,这两种方法都非常慢,这样做的关键是迭代不适合内存,但在我的测试中,不适合内存的迭代需要几天时间才能运行。这在实践中不是一个好的方法。


更多回答

An excellent answer, but why rolling back the edit to use the API of concurrent futures executor correctly?

回答得很好,但为什么要回滚编辑以正确使用并发期货执行器的API?

@wim Thanks for the suggestion, but both versions are in fact correct. In this case a lambda was used intentionally to make the code consistent with the later version that changes lambda: sum(it1) to lambda: sum(consume()), and where the transformation to the positional argument wouldn't work.

@WIM感谢你的建议,但这两个版本实际上都是正确的。在本例中,有意使用lambda以使代码与将lambda:sum(IT1)更改为lambda:sum(Consumer())的较新版本保持一致,并且到Positive参数的转换将不起作用。

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com