- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
考虑以下脚本,我在其中测试了两种对 itertools.tee
获得的生成器执行某些计算的方法:
#!/usr/bin/env python3
from sys import argv
from itertools import tee
from multiprocessing import Process
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def compute_double_sum(iterable):
s = sum(map(double, iterable))
print(s)
def square(x):
return x * x
def compute_square_sum(iterable):
s = sum(map(square, iterable))
print(s)
g1, g2 = tee(my_generator(), 2)
try:
processing_type = argv[1]
except IndexError:
processing_type = "no_multi"
if processing_type == "multi":
p1 = Process(target=compute_double_sum, args=(g1,))
p2 = Process(target=compute_square_sum, args=(g2,))
print("p1 starts")
p1.start()
print("p2 starts")
p2.start()
p1.join()
print("p1 finished")
p2.join()
print("p2 finished")
else:
compute_double_sum(g1)
compute_square_sum(g2)
这是我在“正常”模式下运行脚本时获得的结果:
$ ./test_tee.py
0
1
2
3
4
20
30
这里是并行模式:
$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished
初始生成器显然以某种方式“复制”并执行了两次。
我想避免这种情况,因为在我的实际应用程序中,这似乎会在我用来制作初始生成器(https://github.com/pysam-developers/pysam/issues/397)的外部库之一中引发错误,并且仍然能够在在相同的生成值上并行。
有没有办法实现我想要的?
最佳答案
我在这里找到了一些替代方法:https://stackoverflow.com/a/26873783/1878788 .
在这种方法中,我们不再使用生成器。我们只是复制它生成的项目并将它们提供给一个复合函数,该函数仅在一个进程中对生成的项目进行并行处理,但我们通过使用 Pool
(这就是所谓的 map /减少方法?):
#!/usr/bin/env python3
from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
def my_generator():
for i in range(5):
print(i)
yield i
def double(x):
return 2 * x
def square(x):
return x * x
def double_and_square(args_list):
return (double(*args_list[0]), square(*args_list[1]))
def sum_tuples(tup1, tup2):
return tuple(starmap(add, zip(tup1, tup2)))
with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))
print(reduce(sum_tuples, results_generator))
这适用于玩具示例。我现在必须弄清楚如何在实际应用案例中类似地组织我的计算。
我尝试使用高阶函数 (make_funcs_applier
) 对此进行概括以生成复合函数 (apply_funcs
),但出现以下错误:
AttributeError: Can't pickle local object 'make_funcs_applier.<locals>.apply_funcs'
根据评论中的建议,我尝试改进上述解决方案以提高可重用性:
#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""
from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add
# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
# def tuple_func(args_list):
# return tuple(func(args) for func, args in zip(funcs, args_list))
# return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))
class FuncApplier(object):
"""This kind of object can be used to group functions and call them on a
tuple of arguments."""
__slots__ = ("funcs", )
def __init__(self, funcs):
self.funcs = funcs
def __len__(self):
return len(self.funcs)
def __call__(self, args_list):
return tuple(func(args) for func, args in zip(self.funcs, args_list))
def fork_args(self, args_list):
"""Takes an arguments list and repeat them in a n-tuple."""
return tuple(repeat(args_list, len(self)))
def sum_tuples(*tuples):
"""Element-wise sum of tuple items."""
return tuple(starmap(add, zip(*tuples)))
# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
return x + 1
def double(x):
return 2 * x
def square(x):
return x * x
def main():
def my_generator():
for i in range(5):
print(i)
yield i
test_tuple_func = FuncApplier((plus_one, double, square))
with Pool(processes=5) as pool:
results_generator = pool.imap_unordered(
test_tuple_func,
(test_tuple_func.fork_args(args_list) for args_list in my_generator()))
print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
sum_tuples, results_generator))
exit(0)
if __name__ == "__main__":
exit(main())
测试它:
$ ./test_fork.py
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30
对我来说仍然存在一些烦人的限制,因为我倾向于经常在我的代码中定义本地函数。
关于python - tee'd 生成器上的多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41873531/
我有一个关于 this answer 的问题,如下引用,由 friedo 回答此处的另一个问题。 (我无权对此发表评论,所以我将其作为一个问题提出。) "You can use File::Tee.
我有一个关于 this answer 的问题,在下面引用,由 friedo 在这里回答另一个问题。 (我无权对此发表评论,所以我将此作为问题提出。) "You can use File::Tee. u
您好,我一直在用 C 编写一个 linux shell。我想将我的输出重定向到文件和终端,我发现 tee 是可行的方法。我去了 tee 的 linux 手册页,发现 tee 可以用作函数调用以在 C
有没有办法在发送到文件之前处理来自 tee 的文本? 例如,如果程序输出以下行: stack 11 stack 22 stack 33 serverfault serverfault stack 44
下面是一些关于itertools.tee的测试: li = [x for x in range(10)] ite = iter(li) ========================
我正在将 bash 脚本日志记录移植到 Powershell,它在文件顶部有以下内容: # redirect stderr and stdout backupdir="/backup" logfile
我尝试将 echo 命令保存到日志文件: echo "XXXXX" | tee -a ./directory_with_logs/my_script.log 当文件 my_script.log 存在时
我正在尝试使用 tee 将我的流输出为 1 分钟的片段并同时输出到一个文件中。这是我的命令: ffmpeg -i "rtsp://${cameraIp}:554/axis-media/media.am
我想在 ksh 脚本(使用 exec)中创建一个管道,该管道连接到 tee,并将输出发送到管道。 当前: #Redirect EVERYTHING exec 3>&1 #Save STDOUT as
tee 从标准输入读取并写入标准输出和文件。 some_command |& tee log tee 可以写入压缩文件吗? some_command |& tee -some_option log.b
这个问题已经有答案了: Can you redirect Tee-Object to standard out? (2 个回答) 已关闭去年。 我生成一个 csv 文件: myscript.ps1 |
我有以下代码。 $summary = . { while ($true) { # Generating huge list of psobject } } | Tee-
这个问题已经有答案了: Can you redirect Tee-Object to standard out? (2 个回答) 已关闭去年。 我生成一个 csv 文件: myscript.ps1 |
有人可以帮我解决这个问题吗?我目前正在尝试将查询写入文件,最终将用 notee 关闭它;称呼。我以前使用过发球电话,但由于某种原因,今天我遇到了问题。 这是有问题的语法: tee c:/trash/t
我是 MySQL(或一般 SQL)新手我试图让 MySQL 使用 TEE 命令将时间戳写入带有存储过程的文件中(我不认为我可以使用“select into outfile”,因为我不想删除该文件,我想
我目前正在使用以下内容来捕获进入终端的所有内容并将其放入日志文件中 exec 4&2>&>(tee -a $LOG_FILE) 但是,我不想让颜色转义码/困惑进入日志文件。所以我有这样的东西,有点管用
这个问题在这里已经有了答案: Force line-buffering of stdout in a pipeline (7 个答案) 关闭 9 年前。 我正在运行这样的命令: python myc
我正在尝试在 ubuntu 15.04 上将 tee 命令与 rendercheck 测试一起使用,tee 命令可以很好地处理 6 个 rendercheck 测试,例如: ./renderchec
我想通过使用 while 循环和读取来模拟 shell 脚本中 tee 命令的行为,或者是否可以查看命令的内容。 最佳答案 不确定你在问什么,但为了一个简单的例子,试试这个 - file=$1
我想要这样的东西 $> ps -ax | tee -a processes.txt 在 UNIX C 编程环境中,意味着不通过 shell 脚本。 基本上有一个 API,这样我就可以在 STDIN 和
我是一名优秀的程序员,十分优秀!