作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一组 CPU 密集型进程,它们有时相互依赖才能继续进行。所以像
def run():
while True:
do stuff
wake up some other process
wait for some other process to wake me up
do stuff
在每个进程中我想使用异步,这样我总是可以拥有
run
的实例在其他人等待被唤醒时运行。看着
asyncio
docs,我看到的“高级 API”部分中唯一的 IPC 选项使用套接字。我更愿意使用管道,看起来我可以使用低级 API 来做,但是该文档充满了警告,如果您只是在编写应用程序,那么使用它是错误的。有人可以在这里做惯用的事情吗? (而且,速度是一个重要因素,所以如果有一些不太惯用但性能更高的东西,我也想知道这个选项。)
最佳答案
我想提一下aioprocessing图书馆,因为我在我的一个项目中成功使用了它。它为multiprocessing
提供了一个anync 接口(interface)。包括 IPC 的原语,例如 Process
, Pipe
, Lock
, Queue
等等。它使用线程池来做到这一点:
...
@staticmethod
def coro_maker(func):
def coro_func(self, *args, loop=None, **kwargs):
return self.run_in_executor(
getattr(self, func), *args, loop=loop, **kwargs
)
return coro_func
但老实说,很大程度上取决于要解决的问题,取决于并发执行的任务,因为异步方法中的密集 IPC 本身由于事件循环、线程池等的开销而不如同步方法有效。有时最好使所有 IPC 操作同步并将其全部放在单独的线程中。同样,这完全取决于问题和环境。下面是一个远非全面的基准,但它可以给出正在解决的问题的大致图片(缓冲区的密集交换)。
Sync SimpleQueue: 1.4309470653533936
AioSimpleQueue: 12.32670259475708
AioQueue: 14.342737436294556
AioPipe: 11.747064590454102
subprocess pipe stream: 7.344956159591675
socket stream: 4.360717058181763
# main.py
import sys
import time
import asyncio
import aioprocessing as ap
import multiprocessing as mp
import proc
count = 5*10**4
data = b'*'*100
async def sync_simple_queue_func():
out_ = mp.SimpleQueue()
in_ = mp.SimpleQueue()
p = ap.AioProcess(target=proc.start_sync_queue_func, args=(out_, in_))
p.start()
begin_ts = time.time()
for i in range(count):
out_.put(data)
res = in_.get()
print('Sync SimpleQueue: ', time.time() - begin_ts)
out_.put(None)
async def simple_queue_func():
out_ = ap.AioSimpleQueue()
in_ = ap.AioSimpleQueue()
p = ap.AioProcess(target=proc.start_queue_func, args=(out_, in_))
p.start()
begin_ts = time.time()
for i in range(count):
await out_.coro_put(data)
res = await in_.coro_get()
print('AioSimpleQueue: ', time.time() - begin_ts)
await out_.coro_put(None)
async def queue_func():
out_ = ap.AioQueue()
in_ = ap.AioQueue()
p = ap.AioProcess(target=proc.start_queue_func, args=(out_, in_))
p.start()
begin_ts = time.time()
for i in range(count):
await out_.coro_put(data)
res = await in_.coro_get()
print('AioQueue: ', time.time() - begin_ts)
await out_.coro_put(None)
async def pipe_func():
main_, child_ = ap.AioPipe()
p = ap.AioProcess(target=proc.start_pipe_func, args=(child_,))
p.start()
begin_ts = time.time()
for i in range(count):
await main_.coro_send(data)
res = await main_.coro_recv()
print('AioPipe: ', time.time() - begin_ts)
await main_.coro_send(None)
await p.coro_join()
server = None
async def handle_child(reader, writer):
begin_ts = time.time()
for i in range(count):
writer.write(data)
res = await reader.read(len(data))
print('socket stream: ', time.time() - begin_ts)
writer.close()
async def socket_func():
global server
addr = ('127.0.0.1', 8888)
server = await asyncio.start_server(handle_child, *addr)
p = ap.AioProcess(target=proc.start_socket_func, args=(addr,))
p.start()
async with server:
await server.serve_forever()
async def subprocess_func():
prog = await asyncio.create_subprocess_shell(
'python proc.py',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE)
begin_ts = time.time()
for i in range(count):
prog.stdin.write(data)
res = await prog.stdout.read(len(data))
print('subprocess pipe stream: ', time.time() - begin_ts)
prog.stdin.close()
async def main():
await sync_simple_queue_func()
await simple_queue_func()
await queue_func()
await pipe_func()
await subprocess_func()
await socket_func()
asyncio.run(main())
# proc.py
import asyncio
import sys
import aioprocessing as ap
async def sync_queue_func(in_, out_):
while True:
n = in_.get()
if n is None:
return
out_.put(n)
async def queue_func(in_, out_):
while True:
n = await in_.coro_get()
if n is None:
return
await out_.coro_put(n)
async def pipe_func(child):
while True:
n = await child.coro_recv()
if n is None:
return
await child.coro_send(n)
data = b'*' * 100
async def socket_func(addr):
reader, writer = await asyncio.open_connection(*addr)
while True:
n = await reader.read(len(data))
if not n:
break
writer.write(n)
def start_sync_queue_func(in_, out_):
asyncio.run(sync_queue_func(in_, out_))
def start_queue_func(in_, out_):
asyncio.run(queue_func(in_, out_))
def start_pipe_func(child):
asyncio.run(pipe_func(child))
def start_socket_func(addr):
asyncio.run(socket_func(addr))
async def connect_stdin_stdout():
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
dummy = asyncio.Protocol()
await loop.connect_read_pipe(lambda: protocol, sys.stdin) # sets read_transport
w_transport, _ = await loop.connect_write_pipe(lambda: dummy, sys.stdout)
writer = asyncio.StreamWriter(w_transport, protocol, reader, loop)
return reader, writer
async def main():
reader, writer = await connect_stdin_stdout()
while True:
res = await reader.read(len(data))
if not res:
break
writer.write(res)
if __name__ == "__main__":
asyncio.run(main())
关于python - 使用异步的进程间通信?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63142281/
根据 Android docs ,activity生命周期如下: onCreate() onStart() onResume() onPause() onStop() onDestroy() 问题是,
我有一门类(class)有很多专栏,但这个问题只需要其中三个: ---------------------------------------- | start_date | start_time
给定在同一个 Tomcat 6 上运行的两个 Web 应用程序。如果您从一个应用程序到另一个应用程序进行 http 调用,Tomcat 是否会“短路”此调用,或者它会在调用之前一直在 interweb
我是一名优秀的程序员,十分优秀!