gpt4 book ai didi

c# - 如何正确管理 TPL 数据流中的完成

转载 作者:行者123 更新时间:2023-11-30 12:19:49 25 4
gpt4 key购买 nike

我创建了类似于网络爬虫的东西来创建我需要管理的 1000 多个网络服务的报告。因此,我创建了一个 TPL 数据流管道来管理数据的获取和处理。我想象中的 Pipeline 看起来有点像这样(对不起我的绘画技巧 :D): The Pipeline

我已经创建了一个实现并且一切正常,直到我开始整个流水线。我将 500 个对象放入管道作为管道的输入,并预计程序会运行一段时间,但程序在移动到执行 block 后停止执行。在检查了 Programm 的流程后,在我看来 Completion 传播到 Dispose Block 的速度很快。我使用相同的管道创建了一个小示例项目,以检查它是我对输入类的实现还是管道本身。示例代码是这样的:

public class Job
{
public int Ticker { get; set; }

public Type Type { get; }

public Job(Type type)
{
Type = type;
}

public Task Prepare()
{
Console.WriteLine("Preparing");
Ticker = 0;
return Task.CompletedTask;
}

public Task Tick()
{
Console.WriteLine("Ticking");
Ticker++;
return Task.CompletedTask;
}

public bool IsCommitable()
{
Console.WriteLine("Trying to commit");
return IsFinished() || ( Ticker != 0 && Ticker % 100000 == 0);
}

public bool IsFinished()
{
Console.WriteLine("Trying to finish");
return Ticker == 1000000;
}

public void IntermediateCleanUp()
{
Console.WriteLine("intermediate Cleanup");
Ticker = Ticker - 120;
}

public void finalCleanUp()
{
Console.WriteLine("Final Cleanup");
Ticker = -1;
}
}

这是我输入到准备 block 中的输入类。

public class Dataflow
{
private TransformBlock<Job, Job> _preparationsBlock;

private BufferBlock<Job> _balancerBlock;

private readonly ExecutionDataflowBlockOptions _options = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 4
};

private readonly DataflowLinkOptions _linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

private TransformBlock<Job, Job> _typeATickBlock;

private TransformBlock<Job, Job> _typeBTickBlock;

private TransformBlock<Job, Job> _writeBlock;

private TransformBlock<Job, Job> _intermediateCleanupBlock;

private ActionBlock<Job> _finalCleanupBlock;

public async Task Process()
{
CreateBlocks();

ConfigureBlocks();

for (int i = 0; i < 500; i++)
{
await _preparationsBlock.SendAsync(new Job(i % 2 == 0 ? Type.A : Type.B));
}
_preparationsBlock.Complete();

await Task.WhenAll(_preparationsBlock.Completion, _finalCleanupBlock.Completion);
}

private void CreateBlocks()
{
_preparationsBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Prepare();
return job;
}, _options);

_balancerBlock = new BufferBlock<Job>(_options);

_typeATickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
return job;
}, _options);

_typeBTickBlock = new TransformBlock<Job, Job>(async job =>
{
await job.Tick();
await job.Tick();
return job;
}, _options);

_writeBlock = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine(job.Ticker);
return job;
}, _options);

_finalCleanupBlock = new ActionBlock<Job>(job => job.finalCleanUp(), _options);

_intermediateCleanupBlock = new TransformBlock<Job, Job>(job =>
{
job.IntermediateCleanUp();
return job;
}, _options);
}

private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

_balancerBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
_balancerBlock.LinkTo(_typeBTickBlock, _linkOptions, job => job.Type == Type.B);

_typeATickBlock.LinkTo(_typeATickBlock, _linkOptions, job => !job.IsCommitable());
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());

_typeBTickBlock.LinkTo(_typeBTickBlock, _linkOptions, job => !job.IsCommitable());

_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}
}

这是我的数据流管道,代表我上面的“艺术品”:D。所有这些都在我的调度程序中执行,该调度程序在 Programm.cs 中启动:

public class Scheduler
{
private readonly Timer _timer;

private readonly Dataflow _flow;


public Scheduler(int intervall)
{
_timer = new Timer(intervall);
_flow = new Dataflow();
}

public void Start()
{
_timer.AutoReset = false;
_timer.Elapsed += _timer_Elapsed;
_timer.Start();
}

private async void _timer_Elapsed(object sender, ElapsedEventArgs e)
{
try
{
_timer.Stop();
Console.WriteLine("Timer stopped");
await _flow.Process().ConfigureAwait(false);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
finally
{
Console.WriteLine("Timer started again.");
_timer.Start();
}
}
}

class Program
{
static void Main(string[] args)
{
var scheduler = new Scheduler(1000);
scheduler.Start();

Console.ReadKey();

}
}

我得到的控制台输出是:计时器停止准备中滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成滴答作响试图 promise 试图完成试图 promise 努力完成

似乎程序在那个时候已经停止工作,因为我没有遇到任何断点或进一步。我认为我所有的 block 都已经收到完成信号,因此停止接收任何新项目。因此我的问题是:如何管理完成信号,以便管道仅在没有更多工作要做时完成?

最佳答案

您的流程的主要问题是您的滴答 block 的反馈循环。这导致两个问题。

  • 背压
  • 完成流程

第一:背压

_typeATickBlock 链接回自身时,一旦达到其容量,它将停止接受所有消息。在您的案例 4 中,这意味着一旦它在输出缓冲区中有 3 条消息并且正在处理一条消息,它将停止接受和传递消息。您可以通过将以下行添加到 block 中来查看:

Console.WriteLine($"Tick Block {_typeATickBlock.InputCount}/{_typeATickBlock.OutputCount}");

并将输出:

Tick Block 0/3

要解决此问题,您可以添加任何缓冲 block 、Buffer 或 Transform。关键是缓冲区的有限容量。在您的情况下,每条消息都需要重新路由回 tick block 。有了它,您就知道您的容量需要在任何给定时间与消息量相匹配。在本例中为 500。

_printingBuffer = new TransformBlock<Job, Job>(job =>
{
Console.WriteLine($"{_printingBuffer.InputCount}/{_printingBuffer.OutputCount}");
return job;
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 500 });

在您的实际代码中,您可能不知道该值,Unbounded 可能是避免锁定管道的最佳选择,但您可以根据传入量调整该值。

二:完成流程

在你的管道中有一个反馈循环,完成传播变得比简单地设置链接选项更困难。一旦完成到达滴答 block ,它就会停止接受所有消息,即使是那些仍然需要处理的消息。为避免这种情况,您需要暂停传播,直到所有消息都通过循环。首先,您在滴答 block 之前停止传播,然后检查参与循环的每个 block 上的缓冲区。然后,一旦所有缓冲区都为空,就会将完成和故障传播到 block 。

_balancerBlock.Completion.ContinueWith(tsk =>
{
while (!_typeATickBlock.Completion.IsCompleted)
{
if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
&& _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
{
_typeATickBlock.Complete();
}
}
});

最后

带有完成设置和插入缓冲区的完整 ConfigureBlocks 应如下所示。请注意,我在这里只传递完整而不是错误,并且我删除了 B 型分支。

private void ConfigureBlocks()
{
_preparationsBlock.LinkTo(_balancerBlock, _linkOptions);

_balancerBlock.LinkTo(_typeATickBlock, job => job.Type == Type.A);

_balancerBlock.Completion.ContinueWith(tsk =>
{
while (!_typeATickBlock.Completion.IsCompleted)
{
if (_printingBuffer.InputCount == 0 && _printingBuffer.OutputCount == 0
&& _typeATickBlock.InputCount == 0 && _typeATickBlock.OutputCount == 0)
{
_typeATickBlock.Complete();
}
}
});

_typeATickBlock.LinkTo(_printingBuffer, job => !job.IsCommitable());
_printingBuffer.LinkTo(_typeATickBlock);
_typeATickBlock.LinkTo(_writeBlock, _linkOptions, job => job.IsCommitable());

_writeBlock.LinkTo(_intermediateCleanupBlock, _linkOptions, job => !job.IsFinished());
_writeBlock.LinkTo(_finalCleanupBlock, _linkOptions, job => job.IsFinished());

_intermediateCleanupBlock.LinkTo(_typeATickBlock, _linkOptions, job => job.Type == Type.A);
}

我前阵子写了一篇博文,博客不再活跃,关于用反馈循环处理完成。它可能会提供更多帮助。从 WayBackMachine 中检索。

Finding Completion in a Complex Flow: Feedback Loops

关于c# - 如何正确管理 TPL 数据流中的完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55324214/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com