gpt4 book ai didi

multithreading - Perl Queue 和 Threads 异常退出

转载 作者:行者123 更新时间:2023-12-03 12:52:35 26 4
gpt4 key购买 nike

我对 Perl 很陌生,尤其是 Perl 线程。
我想完成:

  • 有 5 个线程将数据(随机数)排入队列
    线程::队列
  • 有 3 个线程将从
    线程::队列。

  • 我为实现上述任务而编写的完整代码:
    #!/usr/bin/perl -w
    use strict;
    use threads;
    use Thread::Queue;


    my $queue = new Thread::Queue();
    our @Enquing_threads;
    our @Dequeuing_threads;

    sub buildQueue
    {
    my $TotalEntry=1000;
    while($TotalEntry-- >0)
    {
    my $query = rand(10000);
    $queue->enqueue($query);
    print "Enque thread with TID " .threads->tid . " got $query,";
    print "Queue Size: " . $queue->pending . "\n";
    }
    }
    sub process_Queue
    {
    my $query;
    while ($query = $queue->dequeue)
    {
    print "Dequeu thread with TID " .threads->tid . " got $query\n";
    }
    }
    push @Enquing_threads,threads->create(\&buildQueue) for 1..5;
    push @Dequeuing_threads,threads->create(\&process_Queue) for 1..3;

    我面临的问题 :
  • 线程未按预期同时运行。
  • 整个程序异常退出,控制台输出如下:

  • Perl exited with active threads: 8 running and unjoined

        0 finished and unjoined
    0 running and detached

    Enque thread with TID 5 got 6646.13585023883,Queue Size: 595
    Enque thread with TID 1 got 3573.84104215917,Queue Size: 595


    感谢您对代码优化的任何帮助。

    最佳答案

    这种行为是意料之中的:当主线程退出时,所有其他线程也会退出。不介意的话可以$thread->detach他们。否则,您必须手动 $thread->join他们,我们会做的。
    $thread->join等待线程完成,并获取返回值(线程可以像子例程一样返回值,尽管上下文(list/void/scalar)必须在生成时固定)。

    我们将detach排队数据的线程:

    threads->create(\&buildQueue)->detach for 1..5;

    现在对于出列线程,我们将它们放入一个词法变量中(为什么要使用全局变量?),以便我们稍后可以将它们出列:
    my @dequeue_threads = map threads->create(\&process_queue), 1 .. 3;

    然后等待他们完成:
    $_->join for @dequeue_threads;

    我们知道分离的线程将在程序退出之前完成执行,因为出队线程退出的唯一方法是耗尽队列。

    除了一个半的​​错误。你看,空队列和完成队列之间是有区别的。如果队列为空,则出队线程将阻塞 $queue->dequeue直到他们得到一些输入。传统的解决方案是 dequeue而他们得到的值(value)是被定义的。我们可以通过提供尽可能多的 undef 来打破循环。队列中的值,因为有线程从队列中读取。 Thread::Queue 的更现代版本有一个 end方法,使 dequeue返回 undef对于所有后续调用。

    问题是何时结束队列。在所有排队线程都退出之后,我们应该这样做。这意味着,我们应该手动等待它们。叹。
    my @enqueueing = map threads->create(\&enqueue), 1..5;
    my @dequeueing = map threads->create(\&dequeue), 1..3;
    $_->join for @enqueueing;
    $queue->enqueue(undef) for 1..3;
    $_->join for @dequeueing;

    sub dequeuing : while(defined( my $item = $queue->dequeue )) { ... } .

    使用 defined测试修复了另一个错误: rand可以返回零,尽管这不太可能并且会通过大多数测试。 rand的契约(Contract)是它返回一个包含零和排除某个上限之间的伪随机浮点数:来自区间 [0, x) 的数字.边界默认为 1 .

    如果您不想手动加入排队线程,则可以使用信号量来表示完成。信号量是一种多线程原语,可以递增和递减,但不能低于零。如果减量操作会使下降计数低于零,则调用会阻塞,直到另一个线程提高计数。如果开始计数是 1 ,这可以用作阻止资源的标志。

    我们也可以从负值 1 - $NUM_THREADS 开始,并让每个线程递增该值,这样只有当所有线程都退出时,它才能再次递减。
    use threads;  # make a habit of importing `threads` as the first thing

    use strict; use warnings;
    use feature 'say';

    use Thread::Queue;
    use Thread::Semaphore;

    use constant {
    NUM_ENQUEUE_THREADS => 5, # it's good to fix the thread counts early
    NUM_DEQUEUE_THREADS => 3,
    };

    sub enqueue {
    my ($out_queue, $finished_semaphore) = @_;
    my $tid = threads->tid;

    # iterate over ranges instead of using the while($maxval --> 0) idiom
    for (1 .. 1000) {
    $out_queue->enqueue(my $val = rand 10_000);
    say "Thread $tid enqueued $val";
    }

    $finished_semaphore->up;
    # try a non-blocking decrement. Returns true only for the last thread exiting.
    if ($finished_semaphore->down_nb) {
    $out_queue->end; # for sufficiently modern versions of Thread::Queue
    # $out_queue->enqueue(undef) for 1 .. NUM_DEQUEUE_THREADS;
    }
    }

    sub dequeue {
    my ($in_queue) = @_;
    my $tid = threads->tid;
    while(defined( my $item = $in_queue->dequeue )) {
    say "thread $tid dequeued $item";
    }
    }

    # create the queue and the semaphore
    my $queue = Thread::Queue->new;
    my $enqueuers_ended_semaphore = Thread::Semaphore->new(1 - NUM_ENQUEUE_THREADS);

    # kick off the enqueueing threads -- they handle themself
    threads->create(\&enqueue, $queue, $enqueuers_ended_semaphore)->detach for 1..NUM_ENQUEUE_THREADS;

    # start and join the dequeuing threads
    my @dequeuers = map threads->create(\&dequeue, $queue), 1 .. NUM_DEQUEUE_THREADS;
    $_->join for @dequeuers;

    如果线程似乎不是并行运行,而是顺序运行,请不要感到惊讶:此任务(将随机数排队)非常快,并且不适合多线程(排队比创建随机数更昂贵)。

    这是一个示例运行,其中每个 enqueuer 只创建两个值:
    Thread 1 enqueued 6.39390993005694
    Thread 1 enqueued 0.337993319585337
    Thread 2 enqueued 4.34504733960242
    Thread 2 enqueued 2.89158054485114
    Thread 3 enqueued 9.4947585773571
    Thread 3 enqueued 3.17079715055542
    Thread 4 enqueued 8.86408863197179
    Thread 5 enqueued 5.13654995317669
    Thread 5 enqueued 4.2210886147538
    Thread 4 enqueued 6.94064174636395
    thread 6 dequeued 6.39390993005694
    thread 6 dequeued 0.337993319585337
    thread 6 dequeued 4.34504733960242
    thread 6 dequeued 2.89158054485114
    thread 6 dequeued 9.4947585773571
    thread 6 dequeued 3.17079715055542
    thread 6 dequeued 8.86408863197179
    thread 6 dequeued 5.13654995317669
    thread 6 dequeued 4.2210886147538
    thread 6 dequeued 6.94064174636395

    你可以看到 5设法在 4 之前将一些东西排入队列.线程 78不要让任何东西出队, 6太快了。此外,在产生出队之前,所有入队都已完成(对于如此少量的输入)。

    关于multithreading - Perl Queue 和 Threads 异常退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18352106/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com