gpt4 book ai didi

multithreading - 在Perl中同时使用线程和Parallel::ForkManager的阻塞问题

转载 作者:行者123 更新时间:2023-12-03 13:19:11 25 4
gpt4 key购买 nike

我有一个脚本可以完成以下两项工作:
答:连接到数据库并从中检索数据。
B.处理检索到的数据。

因此,我为它们每个创建了两个线程。
我还利用Parallel::ForkManager生成子进程并执行作业A,这样我就可以一次连接到多个数据库。

我一直试图解决的问题是在ForkManager-> wait_all_children中添加阻止调用时。作业A无法在完成时运行其回调。

如果我从@threads数组中删除第二个作业(B),则回调函数将运行。
所以我认为我在使用线程方面有误解,但我认为
这两个线程不应该互相阻塞。如果不是,那么可能是什么原因导致我的子进程无法完成其工作?

在设计说明上,我无法每次完成工作A都执行工作B,因为工作B非常昂贵,并且可能会运行很长时间。因此,这将阻止我完成所有工作A。因此,我宁愿有一个单独的线程来周期性地对所有检索到的数据进行“批量”处理。

这是我的示例代码,重现了我的问题:

use Data::Dumper;
use Parallel::ForkManager;
use threads;
use threads::shared;

my $isRunningJobThread :shared;

my @jobs = ('a', 'b', 'c', 'd', 'e', 'f', 'g');
my @threads = (
threads->create(\&jobThread),
threads->create(\&compileCompletedJobThread)
);

$_->join for @threads;

print "All done\n";

sub jobThread{
my $pm = Parallel::ForkManager->new(5);
$pm->run_on_finish(sub {
print "Job done\n";
});

$isRunningJobThread = 1;
foreach my $job (@jobs) {
$pm->start and next;
print "Do job for : $job\n";
$pm->finish;
}

$pm->wait_all_children;
}

sub compileCompletedJobThread{
while($isRunningJobThread) {
sleep 10;

print "Compiling completed jobs\n";
}
}

最佳答案

好,哇慢下来您正在用同一代码执行threadsfork。这是一个非常可怕的想法。它们在某种程度上依赖于实现-您可以肯定它们会起作用,但是同时它们都在给世界带来痛苦。 (并发问题,竞争条件等)。

在这种特殊情况下,您需要了解fork()所做的工作是在完全相同的状态下获取流程的完整副本,并且只有一个区别-fork()的返回码。这意味着线程等也将被fork()克隆。 Parallel::ForkManager通过限制并行性范围向您隐藏了其中的一部分,但这就是幕后情况。

我敦促您退后一步,重写一下-您似乎正在做的事情会更适合使用某些工作线程和Thread::Queue:

#!/usr/bin/env perl
use strict;
use warnings;

use threads;
use Thread::Queue;

#parallelism limit
my $num_threads = 5;

#input and output queues
my $work_q = Thread::Queue -> new();
my $result_q = Thread::Queue -> new;

#jobs as before
my @jobs = ('a', 'b', 'c', 'd', 'e', 'f', 'g');

#worker - reads from queue one item at a time.
#exits if the queue is 'undef' which happens if it has been `end`ed.
sub worker {
while ( my $item = $work_q -> dequeue ) {
print threads -> self -> tid.": processing work item $item\n";
#pretend we did some work, queue the result.
$result_q -> enqueue ( threads -> self -> tid . ": finished $item" );
}
}

#spawn threads
threads -> create (\&worker) for 1..$num_threads;
#queue jobs
$work_q -> enqueue ( @jobs );
#close queue, so threads will exit when they hit the end of the queue.
#dequeue will return 'undef' rather than blocking.
$work_q -> end;

#join all the threads.
$_->join for threads -> list;
#all threads are finished, so we close the result queue.
#again - so dequeue is undef when empty, rather than just blocking.
$result_q -> end;

while ( my $result = $result_q -> dequeue ) {
print "Got result of $result\n";
}

print "All done\n";

由于您指出您也希望并行运行'result_q',因此您可以使用'result handler'作为另一个线程来做相同的事情,结果几乎相同。

这有点令人讨厌,因为您确实需要根据打开/关闭的队列了解“退出”条件。但是这样的事情:
#!/usr/bin/env perl
use strict;
use warnings;

use threads;
use Thread::Queue;

#parallelism limit
my $num_threads = 5;

#input and output queues
my $work_q = Thread::Queue->new;
my $result_q = Thread::Queue->new;

#jobs as before
my @jobs = ( 'a', 'b', 'c', 'd', 'e', 'f', 'g' );

#worker - reads from queue one item at a time.
#exits if the queue is 'undef' which happens if it has been `end`ed.
sub worker {
while ( my $item = $work_q->dequeue ) {
print threads->self->tid . ": processing work item $item\n";

#pretend we did some work, queue the result.
$result_q->enqueue( threads->self->tid . ": finished $item" );
}
}

#a thread to process the results in parallel
sub collator {
while ( my $result = $result_q->dequeue ) {
print "Got result of $result\n";
}
}

#spawn threads
my @workers = map { threads->create( \&worker ) } 1 .. $num_threads;
my $collator = threads -> create ( \&collator );

#queue jobs
$work_q->enqueue(@jobs);

#close queue, so threads will exit when they hit the end of the queue.
#dequeue will return 'undef' rather than blocking.
$work_q->end;

#join all the threads.
$_->join for @workers;

#all threads are finished, so we close the result queue.
#again - so dequeue is undef when empty, rather than just blocking.
$result_q->end;

#reap 'collator' once it's finished.
$collator->join;


print "All done\n";

与上面的代码几乎相同,但是产生了一个'workers'列表-因为这样您就可以 end $work_q,等待“workers”退出(和 join)-然后您知道将没有更多结果输入 $result_q,然后可以对其进行 end。 (并等待 collator退出)。

关于multithreading - 在Perl中同时使用线程和Parallel::ForkManager的阻塞问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42757242/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com