gpt4 book ai didi

c# - 如何在TPL中实现连续运行的数据流 block ?

转载 作者:太空狗 更新时间:2023-10-29 21:44:54 25 4
gpt4 key购买 nike

我使用 BufferBlock 和 ActionBlock 设置了生产者/消费者数据流 block ,它在控制台应用程序中运行良好;

将所有项目添加到 BurfferBlock 并将 BufferBlock 与其他 Action Item 链接后;它运作良好。

现在我想使用该内部服务,其中此数据流 block 管道将始终处于运行状态,并且当消息通过外部事件可用时,它将进入缓冲区 block 并开始处理。我怎样才能做到这一点?

到目前为止,我已经完成了以下工作:

public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

BufferBlock = new BufferBlock<WorkItem>();

GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);

ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});

Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);

TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite);
logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
return WorkItem;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});

BatchBlock.LinkTo(ProcessItems);
TimingBlock.LinkTo(BatchBlock);
BufferBlock.LinkTo(TimingBlock);
}

最佳答案

您的批量大小由 batchblock 构造函数中的变量“BoundingCapacity”定义。批处理将在以下时间发布:

  • 已收到与批量大小相等的帖子数(在构造函数中指定)
  • 批处理 block 被标记为完成
  • triggerbatch 方法被调用

您似乎希望在满足 bath 大小或发生超时时发布一个批处理。如果是这种情况,并且批处理大小不重要,我实际上只是向您拥有的计时器添加一个循环间隔,并使批处理 block 下游的对象忽略空帖子。

您可能真正想要的,也是最符合数据流编程哲学的是,当您开始发布一系列项目时创建一个新的批处理 block ,然后在完成或发生超时时完成它。如果新帖子不存在,将创建一个新的批处理 block 。

尝试在仅基于第一个触发器触发的 batchblock 周围实现超时计时器的问题是,您将需要计算和验证发送到 bufferblock 的帖子,或者您将需要查看来自 bufferblock 的帖子。这两种情况都会造成很多丑陋和/或违反 block 封装。

关于c# - 如何在TPL中实现连续运行的数据流 block ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20335807/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com