- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
当您创建具有有限容量的 batchblock 并在(并行地)发布新项目时调用 triggerBatch - 发布新项目将在触发器批处理执行期间失败。
调用触发器批处理(每 X 次)是为了确保数据不会在 block 中延迟太久,以防传入数据流暂停或减慢。
以下代码将输出一些“失败后”事件。例如:
public static void Main(string[] args)
{
var batchBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions() { BoundedCapacity = 10000000 });
var actionBlock = new ActionBlock<int[]>(x => ProcessBatch(x), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
batchBlock.LinkTo(actionBlock);
var producerTask = Task.Factory.StartNew(() =>
{
//Post 10K Items
for (int i = 0; i < 10000; i++)
{
var postResult = batchBlock.Post(i);
if (!postResult)
Console.WriteLine("Failed to Post");
}
});
var triggerBatchTask = Task.Factory.StartNew(() =>
{
//Trigger Batch..
for (int i = 0; i < 1000000; i++)
batchBlock.TriggerBatch();
});
producerTask.Wait();
triggerBatchTask.Wait();
}
public static void ProcessBatch(int[] batch)
{
Console.WriteLine("{0} - {1}", batch.First(), batch.Last());
}
*请注意,只有当 batchBlock 有界时,这种情况才可重现。
我是不是遗漏了什么或者是 batchBlock 的问题?
最佳答案
BatchBlock
并没有真正拒绝该项目,它试图推迟它。除了在 Post()
的情况下,推迟不是一个选项。解决此问题的一个简单方法是使用 await batchBlock.SendAsync(i)
而不是 batchBlock.Post(i)
(这也意味着您需要更改 Task.Factory.StartNew(() =>
到 Task.Run(async () =>
)。
为什么会这样?根据the source code ,如果 BatchBlock
是有界的,则 TriggerBatch()
会被异步处理,并且在处理期间不会接受任何新项目。
在任何情况下,您都不应该期望 Post()
在有界 block 上总是返回 true
,如果 block 已满,Post( )
也将返回 false
。
关于c# - 意外行为 - TPL DataFlow BatchBlock 在 TriggerBatch 执行时拒绝项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35626955/
我有一个由几个块组成的数据流管道。 当元素流经我的处理管道时,我想按字段 A 将它们分组。为此,我有一个 BatchBlock 高 BoundedCapacity 。在其中我存储我的元素,直到我决定它
使用 Dataflow CTP(在 TPL 中) 如果当前排队或推迟的项目数量小于 BatchSize,超时后是否可以自动调用 BatchBlock.TriggerBatch? 更好的是:每次 blo
当您创建具有有限容量的 batchblock 并在(并行地)发布新项目时调用 triggerBatch - 发布新项目将在触发器批处理执行期间失败。 调用触发器批处理(每 X 次)是为了确保数据不会在
我是一名优秀的程序员,十分优秀!