gpt4 book ai didi

python - tee'd 生成器上的多处理

转载 作者:太空狗 更新时间:2023-10-30 01:32:24 25 4
gpt4 key购买 nike

考虑以下脚本,我在其中测试了两种对 itertools.tee 获得的生成器执行某些计算的方法:

#!/usr/bin/env python3

from sys import argv
from itertools import tee
from multiprocessing import Process

def my_generator():
for i in range(5):
print(i)
yield i

def double(x):
return 2 * x

def compute_double_sum(iterable):
s = sum(map(double, iterable))
print(s)

def square(x):
return x * x

def compute_square_sum(iterable):
s = sum(map(square, iterable))
print(s)

g1, g2 = tee(my_generator(), 2)

try:
processing_type = argv[1]
except IndexError:
processing_type = "no_multi"

if processing_type == "multi":
p1 = Process(target=compute_double_sum, args=(g1,))
p2 = Process(target=compute_square_sum, args=(g2,))
print("p1 starts")
p1.start()
print("p2 starts")
p2.start()
p1.join()
print("p1 finished")
p2.join()
print("p2 finished")
else:
compute_double_sum(g1)
compute_square_sum(g2)

这是我在“正常”模式下运行脚本时获得的结果:

$ ./test_tee.py 
0
1
2
3
4
20
30

这里是并行模式:

$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished

初始生成器显然以某种方式“复制”并执行了两次。

我想避免这种情况,因为在我的实际应用程序中,这似乎会在我用来制作初始生成器(https://github.com/pysam-developers/pysam/issues/397)的外部库之一中引发错误,并且仍然能够在在相同的生成值上并行。

有没有办法实现我想要的?

最佳答案

我在这里找到了一些替代方法:https://stackoverflow.com/a/26873783/1878788 .

在这种方法中,我们不再使用生成器。我们只是复制它生成的项目并将它们提供给一个复合函数,该函数仅在一个进程中对生成的项目进行并行处理,但我们通过使用 Pool(这就是所谓的 map /减少方法?):

#!/usr/bin/env python3

from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

def my_generator():
for i in range(5):
print(i)
yield i

def double(x):
return 2 * x

def square(x):
return x * x

def double_and_square(args_list):
return (double(*args_list[0]), square(*args_list[1]))

def sum_tuples(tup1, tup2):
return tuple(starmap(add, zip(tup1, tup2)))

with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))

print(reduce(sum_tuples, results_generator))

这适用于玩具示例。我现在必须弄清楚如何在实际应用案例中类似地组织我的计算。

我尝试使用高阶函数 (make_funcs_applier) 对此进行概括以生成复合函数 (apply_funcs),但出现以下错误:

AttributeError: Can't pickle local object  'make_funcs_applier.<locals>.apply_funcs'

更普遍的尝试

根据评论中的建议,我尝试改进上述解决方案以提高可重用性:

#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""

from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
# def tuple_func(args_list):
# return tuple(func(args) for func, args in zip(funcs, args_list))
# return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))

class FuncApplier(object):
"""This kind of object can be used to group functions and call them on a
tuple of arguments."""
__slots__ = ("funcs", )

def __init__(self, funcs):
self.funcs = funcs

def __len__(self):
return len(self.funcs)

def __call__(self, args_list):
return tuple(func(args) for func, args in zip(self.funcs, args_list))

def fork_args(self, args_list):
"""Takes an arguments list and repeat them in a n-tuple."""
return tuple(repeat(args_list, len(self)))


def sum_tuples(*tuples):
"""Element-wise sum of tuple items."""
return tuple(starmap(add, zip(*tuples)))


# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
return x + 1

def double(x):
return 2 * x

def square(x):
return x * x

def main():
def my_generator():
for i in range(5):
print(i)
yield i


test_tuple_func = FuncApplier((plus_one, double, square))

with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(
test_tuple_func,
(test_tuple_func.fork_args(args_list) for args_list in my_generator()))
print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
sum_tuples, results_generator))
exit(0)

if __name__ == "__main__":
exit(main())

测试它:

$ ./test_fork.py 
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30

对我来说仍然存在一些烦人的限制,因为我倾向于经常在我的代码中定义本地函数。

关于python - tee'd 生成器上的多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41873531/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com