作者热门文章
- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我使用 BufferBlock 和 ActionBlock 设置了生产者/消费者数据流 block ,它在控制台应用程序中运行良好;
将所有项目添加到 BurfferBlock 并将 BufferBlock 与其他 Action Item 链接后;它运作良好。
现在我想使用该内部服务,其中此数据流 block 管道将始终处于运行状态,并且当消息通过外部事件可用时,它将进入缓冲区 block 并开始处理。我怎样才能做到这一点?
到目前为止,我已经完成了以下工作:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite);
logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
return WorkItem;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
BatchBlock.LinkTo(ProcessItems);
TimingBlock.LinkTo(BatchBlock);
BufferBlock.LinkTo(TimingBlock);
}
最佳答案
您的批量大小由 batchblock 构造函数中的变量“BoundingCapacity”定义。批处理将在以下时间发布:
您似乎希望在满足 bath 大小或发生超时时发布一个批处理。如果是这种情况,并且批处理大小不重要,我实际上只是向您拥有的计时器添加一个循环间隔,并使批处理 block 下游的对象忽略空帖子。
您可能真正想要的,也是最符合数据流编程哲学的是,当您开始发布一系列项目时创建一个新的批处理 block ,然后在完成或发生超时时完成它。如果新帖子不存在,将创建一个新的批处理 block 。
尝试在仅基于第一个触发器触发的 batchblock 周围实现超时计时器的问题是,您将需要计算和验证发送到 bufferblock 的帖子,或者您将需要查看来自 bufferblock 的帖子。这两种情况都会造成很多丑陋和/或违反 block 封装。
关于c# - 如何在TPL中实现连续运行的数据流 block ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20335807/
我是一名优秀的程序员,十分优秀!