gpt4 book ai didi

python - 如何在 python 中设计异步管道模式

转载 作者:太空狗 更新时间:2023-10-30 00:02:05 27 4
gpt4 key购买 nike

我正在尝试设计一个可以轻松制作数据处理管道的异步管道。管道由几个函数组成。输入数据从管道的一端进入,从另一端出来。

我想按照以下方式设计管道:

  1. 可以在管道中插入附加功能
  2. 管道中已有的功能可以弹出。

这是我想出的:

import asyncio

@asyncio.coroutine
def add(x):
return x + 1

@asyncio.coroutine
def prod(x):
return x * 2

@asyncio.coroutine
def power(x):
return x ** 3

def connect(funcs):
def wrapper(*args, **kwargs):
data_out = yield from funcs[0](*args, **kwargs)
for func in funcs[1:]:
data_out = yield from func(data_out)
return data_out
return wrapper

pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)

这当然有效,但问题是,如果我想向此管道添加另一个函数(或从中弹出一个函数),我必须再次反汇编并重新连接每个函数。

我想知道是否有更好的方案或设计模式来创建这样的流水线?

最佳答案

我以前做过类似的事情,只使用 multiprocessing图书馆。它有点手动,但它使您能够按照您在问题中的要求轻松创建和修改管道。

这个想法是创建可以存在于多处理池中的函数,它们唯一的参数是一个输入队列和一个输出队列。您通过传递不同的队列将阶段联系在一起。每个阶段在其输入队列上接收一些工作,再做一些工作,然后通过其输出队列将结果传递到下一个阶段。

工作人员不断尝试从他们的队列中获取一些东西,当他们得到一些东西时,他们就开始工作并将结果传递给下一个阶段。所有工作都以通过管道传递“毒丸”结束,导致所有阶段退出:

这个例子只是在多个工作阶段构建一个字符串:

import multiprocessing as mp                                              

POISON_PILL = "STOP"

def stage1(q_in, q_out):

while True:

# get either work or a poison pill from the previous stage (or main)
val = q_in.get()

# check to see if we got the poison pill - pass it along if we did
if val == POISON_PILL:
q_out.put(val)
return

# do stage 1 work
val = val + "Stage 1 did some work.\n"

# pass the result to the next stage
q_out.put(val)

def stage2(q_in, q_out):

while True:

val = q_in.get()
if val == POISON_PILL:
q_out.put(val)
return

val = val + "Stage 2 did some work.\n"
q_out.put(val)

def main():

pool = mp.Pool()
manager = mp.Manager()

# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()

# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))

# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")

# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")

q_main_to_s1.put(POISON_PILL)

pool.close()
pool.join()

return

if __name__ == "__main__":
main()

代码产生这个输出:

Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.

您可以轻松地在管道中放置更多阶段,或者只需更改哪些函数获取哪些队列即可重新排列它们。我对 asyncio 不是很熟悉模块,所以我不能说使用多处理库会失去哪些功能,但这种方法实现和理解起来非常简单,所以我喜欢它的简单性。

关于python - 如何在 python 中设计异步管道模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35834662/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com