gpt4 book ai didi

multithreading - 使用多线程时我必须在哪里取消队列的定义

转载 作者:行者123 更新时间:2023-12-03 13:00:17 25 4
gpt4 key购买 nike

我有一个创建队列的脚本,一些工作人员正在从队列中读取其作业。我的问题是,由于线程处于空闲状态,脚本不会终止并调用printData()。这是因为我尚未将队列设置为undef。

我尝试了许多不同的方法,但是所有方法都会导致各种问题。

  • 尽管队列
  • 中仍然有作业,但任何一个队列都已终止
  • 或者,尽管仍然有一个线程在工作,并试图将新工作插入队列,但目前队列中没有作业。

  • 我使用以下代码
    # -------------------------
    # Main
    # -------------------------
    my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfParallelJobs;
    pullDataFromDbWithDirectory($directory);
    #$worker->enqueue((undef) x $maxNumberOfParallelJobs);
    $_->join for @threads;

    sub pullDataFromDbWithDirectory {
    my $_dir = $_[0];

    if ($itemCount <= $maxNumberOfItems) {
    my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $filesystem '$label' -listdir '$_dir');

    foreach my $item (@retval) {
    $itemCount++;
    (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g;
    my $file = "$_dir/$filename";
    push(@data,$file);

    if ($item =~ /^Dir/) {
    $worker->enqueue($file);
    print "Add $file to queue\n" if $debug;
    }
    }
    }
    }

    sub doOperation () {
    my $ithread = threads->tid();
    do {
    my $folder = $worker->dequeue();
    print "Read $folder from queue with thread $ithread\n" if $debug;
    pullDataFromDbWithDirectory($folder);
    } while ($worker->pending());

    push(@IDLE_THREADS,$ithread);

    }

    编辑:

    我找到了一个丑陋的解决方案。也许有更好的?我将 worker 添加到IDLE阵列中并休眠,直到所有 worker 都在那里
    sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs);
    $worker->enqueue((undef) x $maxNumberOfParallelJobs);
    $_->join for @threads;

    最佳答案

    如果没有线程过早终止,就不能使用->pending()。使固定:

    my $busy: shared = $num_workers;

    sub pullDataFromDbWithDirectory {
    my $tid = threads->tid();
    while (defined( my $folder = $q->dequeue() )) {
    { lock $busy; ++$busy; }
    print "Worker thread $tid processing folder $folder.\n" if $debug;
    pullDataFromDbWithDirectory($folder);
    { lock $busy; --$busy; }
    }

    print "Worker thread $tid exiting.\n" if $debug;
    }

    sleep 0.01 while $q->pending || $busy;
    $worker->end();
    $_->join for @threads;

    但这引入了竞争条件。
  • 工作线程使队列
  • 中当前的最后一个项目出队
  • 主线程检查未决(假)
  • 主线程检查繁忙线程数(无)
  • 主线程向工作人员发出信号以结束
  • 所有其他辅助线程退出。
  • 使上面的项目出队的工作人员将自己标记为忙
  • worker 开始处理最后一个项目,尝试在队列中添加一堆项目,但失败。

  • 出队和忙碌增量需要是原子的,而待处理的校验加上忙碌的校验需要是原子的。

    不更改Thread::Queue是不可能的。您不能仅仅对这两段代码进行锁定,因为这会阻止主服务器在其中一个线程处于空闲状态时检查所有线程是否都处于空闲状态。

    我们需要将 ->dequeue分成其等待组件和出队组件。我们有后者( ->dequeue_nb),所以我们只需要前者。
    use Thread::Queue 3.01;

    sub T_Q_wait {
    my $self = shift;
    lock(%$self);
    my $queue = $$self{'queue'};

    my $count = @_ ? $self->_validate_count(shift) : 1;

    # Wait for requisite number of items
    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
    cond_signal(%$self) if (@$queue);

    return !$$self{'ENDED'};
    }

    现在我们可以编写解决方案了:
    my $busy: shared = 0;

    sub pullDataFromDbWithDirectory {
    my $tid = threads->tid();

    WORKER_LOOP:
    while (T_Q_wait($q)) {
    my $folder;

    {
    lock $busy;
    $folder = $q->dequeue_nb();
    next WORKER_LOOP if !defined($folder);
    ++$busy;
    }

    print "Worker thread $tid processing folder $folder.\n" if $debug;
    pullDataFromDbWithDirectory($folder);

    {
    lock $busy;
    --$busy;
    cond_signal($busy) if !$busy;
    }
    }
    }

    {
    lock $busy;
    cond_wait($busy) while $busy;
    $q->end();
    $_->join() for threads->list();
    }

    如果其他线程在 nextwait之间卡住了工作,则可以使用 dequeue_nb

    关于multithreading - 使用多线程时我必须在哪里取消队列的定义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24014663/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com