gpt4 book ai didi

.net - 其他任务完成时通知任务

转载 作者:行者123 更新时间:2023-12-01 06:05:23 27 4
gpt4 key购买 nike

.Net TPL 专家,

备注 : 无法使用 DataFlow 库;不允许附加组件。

我有四个任务,如下图所示:

enter image description here

  • task_1 (data_producer) -> 从大文件(>500000 条记录)中读取记录并将记录添加到 BlockingCollection
  • task_2, task_3 (data_consumers) -> 这些任务中的每一个都从 BlockingCollection 中获取记录。每个任务对取自 BlockingCollection(网络相关)的记录执行一些工作,完成后,每个任务都可以将记录添加到结果队列中。处理顺序并不重要。
  • task_4(结果处理器)-> 从 results_queue 中获取记录并写入输出文件。

  • 然后我等待任务完成,即:
    Task.WhenAll( t1, t2, t3, t4 )

    所以我有一个生产者任务、多个消费者任务和一个保存结果的任务。

    我的问题是:

    如何在任务 2 和 3 完成时通知任务 4,以便任务 4 也知道何时结束?

    我发现了许多以线性“管道”方式将数据从一个任务“移动”到另一个任务的示例,但没有找到任何说明上述情况的示例;即如何在任务 2 和 3 完成时通知任务 4,以便它也知道何时完成。

    我最初的想法是在任务 4 中“注册”任务 2 和 3,并简单地监控每个注册任务的状态——当任务 2 和 3 不再运行时,任务 4 可以停止(如果结果队列也是空的) .

    提前致谢。

    最佳答案

    这是对 Thomas 的扩展。已经说过了。

    通过使用 BlockingCollection您可以调用GetConsumingEnumerable()在它上面,只是把它当作一个正常的 foreach 循环。这将使您的任务“自然”结束。您唯一需要做的就是添加一个额外的任务来监视任务 2 和 3 以查看它们何时结束并调用它们的完整添加。

    private BlockingCollection<Stage1> _stageOneBlockingCollection = new BlockingCollection<Stage1>();
    private BlockingCollection<Stage2> _stageTwoBlockingCollection = new BlockingCollection<Stage2>();

    Task RunProcess()
    {
    Task1Start();
    var t2 = Stage2Start();
    var t3 = Stage2Start();
    Stage2MonitorStart(t2,t3);
    retrun Task4Start();
    }

    public void Task1Start()
    {
    Task.Run(()=>
    {
    foreach(var item in GetFileSource())
    {
    var processedItem = Process(item);
    _stageOneBlockingCollection.Add(processedItem);
    }
    _stageOneBlockingCollection.CompleteAdding();
    }
    }

    public Task Stage2Start()
    {
    return Task.Run(()=>
    {
    foreach(var item in _stageOneBlockingCollection.GetConsumingEnumerable())
    {
    var processedItem = ProcessStage2(item);
    _stageTwoBlockingCollection.Add(processedItem);
    }
    }
    }

    void Stage2MonitorStart(params Task[] tasks)
    {
    //Once all tasks complete mark the collection complete adding.
    Task.WhenAll(tasks).ContinueWith(t=>_stageTwoBlockingCollection.CompleteAdding());
    }

    public Task Stage4Start()
    {
    return Task.Run(()=>
    {
    foreach(var item in _stageTwoBlockingCollection.GetConsumingEnumerable())
    {
    var processedItem = ProcessStage4(item);
    WriteToOutputFile(processedItem);
    }
    }
    }

    关于.net - 其他任务完成时通知任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40749802/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com