gpt4 book ai didi

c# - 如何以优雅的方式关闭发生致命异常的 TPL 数据流?

转载 作者:行者123 更新时间:2023-12-04 10:30:10 28 4
gpt4 key购买 nike

我在 TPL 数据流上使用顺序管道构建,它由 3 个块组成:

  • B1 - 准备消息
  • B2 - 将消息发布到远程服务
  • B3 - 保存结果

  • 问题是如何在发生服务关闭等错误时关闭管道。管道必须以受控方式关闭,因此 B2 的结果不会丢失。

    最佳答案

    解决方案很简单,但在我得到它之前花了我几轮,因为在 Microsoft 站点上基本库信息背后没有太多信息。

    希望它可以帮助某人。该解决方案可以轻松地重新配置以满足其他要求。

    提出的方法依赖于:

  • CancellationTokenSource表示关机。
    在发生致命异常的情况下,每个块都应该通过共享 CancellationTokenSource 发出关闭信号。目的。
  • 应该在信号之后立即停止工作的块应该通过共享 CancellationTokenSource 进行初始化。对象
  • 程序必须等待最后一个块结束所有消息处理。

  • 这里是管道类中的解决方案和证明它有效的测试。

    这是一个工作示例:
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    using System;
    using System.Threading;
    using System.Threading.Tasks.Dataflow;
    using System.Threading.Tasks;
    using System.Diagnostics;

    namespace Tests.Sets.Research
    {
    [TestClass]
    public class TPLTest
    {
    public class PipeLine
    {
    CancellationTokenSource cancellationTokenSource;
    TransformBlock<int, int> b1, b2;
    ActionBlock<int> bFinal;

    static int SimulateWork(String blockName, int message, CancellationTokenSource cancellationTokenSource)
    {
    try
    {
    Thread.Sleep(100);
    Trace.WriteLine($"{blockName} processed: {message}");
    }
    catch (Exception ex)
    {
    Trace.WriteLine($"Fatal error {ex.Message} at {blockName}");
    cancellationTokenSource.Cancel();
    }
    return message;
    }


    public PipeLine(CancellationTokenSource cancellationTokenSource)
    {
    this.cancellationTokenSource = cancellationTokenSource;

    // Create three TransformBlock<int, int> objects.
    // Each blocks <int, int> object calls the SimulateWork method.
    Func<string, int, CancellationTokenSource, int> doWork = (name, message, ct) => SimulateWork(name, message, ct);

    b1 = new TransformBlock<int, int>((m1) => doWork("b1", m1, cancellationTokenSource),
    new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 , CancellationToken = cancellationTokenSource.Token}); //discard messages on this block if cancel is signaled
    b2 = new TransformBlock<int, int>((m1) => doWork("b2", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
    bFinal = new ActionBlock<int>((m1) => doWork("bFinal", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

    b1.LinkTo(b2, new DataflowLinkOptions { PropagateCompletion = true });
    b2.LinkTo(bFinal, new DataflowLinkOptions { PropagateCompletion = true });
    }

    internal void Complete()
    {
    b1.Complete();
    }

    public void waifForCompletetion()
    {
    Trace.WriteLine($"Waiting for pipeline to end gracefully");
    bFinal.Completion.Wait();
    Trace.WriteLine($"Pipeline terminated");
    }

    public void submitToPipe(int message)
    {
    if (cancellationTokenSource.IsCancellationRequested)
    {
    Trace.WriteLine($"Message {message} was rejected. Pipe is shutting down.Throttling meanwhile");
    return;
    }
    b1.SendAsync(message);
    }
    }

    [TestMethod]
    public void TestShutdown()
    {
    var cancellationTokenSource = new CancellationTokenSource();
    var pipeLine = new PipeLine(cancellationTokenSource);

    //post failure in 2 seconds.
    //It would be the same if was signal from inside block 2
    Task.Factory.StartNew(async () =>
    {
    await Task.Delay(2000);
    Console.WriteLine("Time to shutdown the pipeline!");
    cancellationTokenSource.Cancel();
    });

    //send requests to pipe in background for 5 seconds
    Task.Run(async () =>
    {
    for (int i = 1; i < 100; i++)
    {
    if (cancellationTokenSource.IsCancellationRequested)
    break;

    Thread.Sleep(50); //to see pipe closing input
    pipeLine.submitToPipe(i);
    }
    pipeLine.Complete();
    });

    pipeLine.waifForCompletetion();
    }
    }
    }

    结果如下:
    b2 processed: 13
    b1 processed: 22
    Message 45 was rejected. Pipe is shutting down.Throttling meanwhile
    b2 processed: 14
    bFinal processed: 8
    b2 processed: 15
    bFinal processed: 9
    bFinal processed: 10
    bFinal processed: 11
    bFinal processed: 12
    bFinal processed: 13
    bFinal processed: 14
    bFinal processed: 15
    Pipeline terminated

    从消息 45 被拒绝时起,B1 上不再处理消息。

    已经在 B2 队列中的所有消息都到达了管道的末端。

    关于c# - 如何以优雅的方式关闭发生致命异常的 TPL 数据流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60450973/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com