gpt4 book ai didi

multithreading - 使用IPC::open2传输大文件

转载 作者:行者123 更新时间:2023-12-04 06:42:30 25 4
gpt4 key购买 nike

我制作了一个Perl脚本,该脚本包装了另一个工具(overlapFeatures),以便可以即时正确地转换文件格式。我正在处理的文件都是制表符分隔的表,通常为200万行左右。就其本身而言,overlapFeatures可以轻松应对这些问题。

但是我认为我正在通过一次管道输送这么多管线来导致管道锁定。我知道我需要以某种方式对此进行线程化,以便可以同时读取和写入子进程。但是我真的不明白如何在perl(或其他任何程序)中正确使用线程。据我了解,我可以使用threads甚至IPC::run来解决我的问题。

我最终死锁的原始脚本如下所示:

use strict;
use warnings;
use IPC::Open2;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args)
or die "Failed with error $!\n";

open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
# Do some format conversion...
chomp
my @cols = split /\t/;
# print a modified line to the tool
print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);

while (<$output>){
# format conversion for ouput
chomp;
my @cols = split /\t/;
print join (",",@cols[0,1,2,5,3,8]),"\n";
}
close ($output);

我尝试重写脚本以按照 How to filter a lot of data with IPC::Open2?的方式使用线程,如下所示:
use strict;
use warnings;
use IPC::Open2;
use threads;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args)
or die "Failed with error $!\n";

my $thread = async {
print join(",", qw(seqid start end strand read feature name)),"\n";
for(;;) {
my $line = <$output>; # should block here and wait for output?
last if !defined $line; # end of stream reached?
print STDERR "Got line $line\n";
# Do some format conversion...
chomp $line;
my @cols = split /\t/, $line;
# print a modified line to the tool
print join(",",@cols[0,1,2,5,3,8]),"\n";
}
close($output)
};

{
open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
# format conversion for ouput
chomp;
my @cols = split /\t/;
print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);
}

$thread->join();
waitpid ($pid, 0);

但是,脚本仍然以相同的方式卡住,我也被卡住了。在这种情况下,我也无法解决如何使用 IPC::run的问题。

我究竟做错了什么?我是否误解了线程?

编辑:花费更多时间调试脚本(并从amon帮助),我发现我能够从 $output检索行。但是,该脚本永远不会结束,并且在收到所有输出后似乎会挂起。我认为这是我现在唯一的问题。

最佳答案

这更像是长篇评论。
我在简化版本中尝试了您的代码。我删除了转换代码,使用Unix yes命令作为无限数据源,并将输出打印到/dev/null,因为我们当前对输出不感兴趣,但对程序的工作不感兴趣。作为您的overlapFeatures的替代品,我使用了cat将数据原封不动地传递。

use strict; use warnings; use IPC::Open2; use threads;

my $command = "cat";
my @args = ();

my ($input, $output);
my $pid = open2($output, $input, $command, @args)
or die "Failed with error $!\n";

my $thread = async {
print $_ while defined($_ = <$output>);
close($output)
};

{
my $c=0;
open (my $infh, "-|", "yes") or die;
open my $null, ">/dev/null" or die;
while (<$infh>){
$c++;
print $null $_;
if ($c >= 1_000_000) {
print "\n==another million==\n\n";
$c=0
}
}
close ($input);
}

$thread->join();
waitpid ($pid, 0);
一旦达到一百万行(按字面意思),我就会打印一条状态消息以断言IO仍在工作。
结果
给定脚本在Perl 12.4的Ubuntu Linux上进行了测试,可以完美地工作。因此,可以合理地假设问题不在于IPC代码,而在于数据格式转换,正在包装的程序或数据量( yes输出字符串 "1\n",这导致许多行的数据很少) 。(每组约2MB,每行2个字节)
结论
可能是您正在运行其他配置。如果您正在运行* nix,请断言我使用的脚本也适用于您。如果不是,请明确说明此配置,然后尝试运行等效的脚本。
也有可能将包装器分成两个脚本,至少用于测试,因此您将运行类似
$ convert-to | overlapFeatures | convert-from
这会将所有IPC委派给Shell,并将断言转换正在工作并且该体系结构是可实现的。
集思广益的其他不太可能的想法:
(1)何时执行 close操作?难道是由于某种奇怪的原因,循环的一端过早退出了吗?在 print STDERR "Closing down xx\n"之前的 close可能很有趣。
(2) open2async是否成功产生了它们的进程/线程并返回了控制流?偏执的我会在他们之后再放一个 print STDERR ...
(3)您是否从脚本中获取了任何数据,或者流过了一段时间后流变干了?
编辑
在所有写入结束均为 EOF d之前,管道不会产生 close。因此,所有线程都应关闭未使用的所有内容:
my $thread = async {
close $input;
print $_ while defined($_ = <$output>);
close($output)
};
{
close $output;
my $c=0;
open (my $infh, "-|", "yes") or die;
open my $null, ">/dev/null" or die;
while (<$infh>){
$c++;
print $null $_;
...

关于multithreading - 使用IPC::open2传输大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12532402/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com