gpt4 book ai didi

perl - 使用 Parallel::ForkManager 限制进程

转载 作者:行者123 更新时间:2023-12-01 01:26:13 24 4
gpt4 key购买 nike

我正在尝试使用 Parallel::ForkManager 来控制一些子进程。我想将并发运行的进程数限制为 10 个。我总共需要运行 20 个。

我知道我可以在对象声明的第一行中将进程限制设置为 10,但我还使用 $pm 对象来运行执行不同操作的子进程(当前函数占用更多内存,因此需要被限制)。

我目前的代码不起作用,完成时运行的调用从未进行过,所以剩下的 10 个 child 永远不会 fork 。我不明白为什么会这样——我原以为 child 仍然会在退出时调用完成代码,并减少计数,但“if”语句似乎阻止了这一点。有人可以解释为什么会这样吗?

感谢您的帮助!

# Parallel declarations
my $pm = Parallel::ForkManager->new(30);

$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_str_ref) = @_;
--$active_jobs;
})

my $total_jobs = 0;
my $active_jobs = 0;
while( $total_jobs < 20) {
sleep 300 and next if $active_jobs > 10;

my $pid = $pm->start and ++$active_p1_jobs and ++$total_p1_jobs and next;

my $return = module::function(%args);

$pm->finish(0, { index => $total_jobs, return => $return });
}

print STDERR "Submitted all jobs, now waiting for children to exit.\n";
$pm->wait_all_children();

最佳答案

我将限制为 10 个的作业称为“类型 2”。

这就是我用 P::FM 做的:

use strict;
use warnings;

use List::Util qw( shuffle );
use Parallel::ForkManager qw( );
use POSIX qw( WNOHANG );
use Time::HiRes qw( sleep );

use constant MAX_WORKERS => 30;
use constant MAX_TYPE2_WORKERS => 10;

sub is_type2_job { $_[0]{type} == 2 }

my @jobs = shuffle(
( map { { type => 1, data => $_ } } 0..19 ),
( map { { type => 2, data => $_ } } 0..19 ),
);

my $pm = Parallel::ForkManager->new(MAX_WORKERS);

my $type2_count = 0;
$pm->run_on_finish(sub {
my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $job) = @_;
--$type2_count if is_type2_job($job);
print "Finished: $pid, $job->{type}, $job->{data}, $job->{result}\n";
});

my @postponed_jobs;
while (@postponed_jobs || @jobs) {
my $job;
if (@postponed_jobs && $type2_count < MAX_TYPE2_WORKERS) {
$job = shift(@postponed_jobs);
}
elsif (@jobs) {
$job = shift(@jobs);
if ($type2_count >= MAX_TYPE2_WORKERS && is_type2_job($job)) {
push @postponed_jobs, $job;
redo;
}
}
# elsif (@postponed_jobs) {
# # Already max type 2 jobs being processed,
# # but there are idle workers.
# $job = shift(@postponed_jobs);
# }
else {
local $SIG{CHLD} = sub { };
select(undef, undef, undef, 0.300);
$pm->wait_one_child(WNOHANG);
redo;
}

++$type2_count if is_type2_job($job);

my $pid = $pm->start and next;
$job->{result} = $job->{data} + 100; # Or whatever.
$pm->finish(0, $job);
}

$pm->wait_all_children();

但是这个坏了。选择下一个作业的代码应该在 start 的中间完成(即在它等待 child 完成之后,但在它 fork 之前),而不是在 start 之前。这可能会导致作业无序运行。这不是我第一次希望 P::FM 有 fork 前回调。也许你可以向维护者要一个。

关于perl - 使用 Parallel::ForkManager 限制进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11506940/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com