gpt4 book ai didi

python - 并行化一系列生成器

转载 作者:太空狗 更新时间:2023-10-29 21:08:43 26 4
gpt4 key购买 nike

假设我有如下所示的 Python 流处理代码:

def F1(stream):
for x in stream:
yield f1(x)

def F2(stream):
for x in stream:
yield f2(x)

def F3(stream):
for x in stream:
yield f3(x)

def F4(stream):
for x in stream:
yield f4(x)


for x in F4(F3(F2(F1(range(1000000))))):
print(x)

这大致相当于 range 1000000 | F1 | F2 | F3 | F4 在 Unix 中(假设一个 range 命令),但在 Unix 中管道中的每个进程并行运行。

是否有一种简单的方法来并行化 Python 代码?

最佳答案

你需要管道和 blackmagic,Python 两者都有。

from multiprocessing import Process, Pipe


def F1(stream):
for x in stream:
yield str(x)+'a'

def F2(stream):
for x in stream:
yield x+'b'

def F3(stream):
for x in stream:
yield x+'c'

def F4(stream):
for x in stream:
yield x+'d'



class PIPE_EOF:
pass

class IterableConnection(object):
def __init__(self, pipe):
self.pipe = pipe

def __iter__(self):
return self

def __next__(self):
try:
ret = self.pipe.recv()
if ret == PIPE_EOF:
raise StopIteration
return ret
except EOFError:
raise StopIteration

def next(self):
return self.__next__()


def parallel_generator_chain(*args, **kwargs):
if 'data' in kwargs:
data = kwargs['data']
else:
raise RuntimeError('Missing "data" argument.')

def decorator(func, _input, _output):
def wrapper(*args, **kwargs):
for item in func(_input):
_output.send(item)
_output.send(PIPE_EOF)
return wrapper

for func in args:
in_end, out_end = Pipe(duplex = False)
in_end = IterableConnection(in_end)
func = decorator(func, data, out_end)
p = Process(target = func)
p.start()
data = in_end

for output in data:
yield output



if 'xrange' not in globals():
xrange = range


if __name__ == '__main__':
for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
print(x)

#for x in F4(F3(F2(F1(range(1000000))))):
# print(x)

关于python - 并行化一系列生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20711687/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com