gpt4 book ai didi

python - 将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)?

转载 作者:行者123 更新时间:2023-11-28 19:04:59 26 4
gpt4 key购买 nike

几乎每个人在第一次看到 Python 中的线程时都会意识到,GIL 让真正想要并行处理的人的生活变得悲惨 - 或者至少给它一个机会。

我目前正在考虑实现类似 Reactor 模式的东西。实际上,我想在类似线程的情况下监听传入的套接字连接,当有人尝试连接时,接受该连接并将其传递给类似线程的另一个线程进行处理。

我(还)不确定我可能面临什么样的负载。我知道目前对传入消息设置了 2MB 的上限。从理论上讲,我们每秒可以获得数千次(尽管我不知道实际上我们是否见过这样的事情)。处理一条消息所花费的时间并不非常重要,但显然越快越好。

我正在研究 Reactor 模式,并使用 multiprocessing 库开发了一个小示例(至少在测试中)似乎工作得很好。然而,现在/很快我们就会有 asyncio可用的库,它将为我处理事件循环。

通过组合 asynciomultiprocessing 有什么东西可以咬我吗?

最佳答案

您应该能够安全地组合 asynciomultiprocessing 而不会遇到太多麻烦,尽管您不应该直接使用 multiprocessingasyncio(以及任何其他基于事件循环的异步框架)的主要错误是阻塞事件循环。如果您尝试直接使用 multiprocessing,任何时候您阻塞等待子进程,您都会阻塞事件循环。显然,这是不好的。

避免这种情况的最简单方法是使用 BaseEventLoop.run_in_executorconcurrent.futures.ProcessPoolExecutor 中执行函数. ProcessPoolExecutor 是使用multiprocessing.Process 实现的进程池,但是asyncio 内置支持在不阻塞事件循环的情况下执行其中的函数.这是一个简单的例子:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5

@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

对于大多数情况,仅此功能就足够了。如果您发现自己需要 multiprocessing 中的其他构造,例如 QueueEventManager 等,可以使用名为 aioprocessing 的第三方库(完全公开:我写的),它提供了所有 multiprocessing 数据结构的 asyncio 兼容版本。这是一个演示示例:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()

@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

关于python - 将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48035079/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com