- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我不确定这是否可行,但如果可行,我可能做得不对。假设我有一个链接到许多消费者(ActionBlocks)的共享缓冲区。每个消费者都应该消费满足用于将其链接到缓冲区的谓词的数据。例如,ActionBlock1 应该消耗满足 x => x % 5 == 0
的数字, ActionBlock2 应该只消耗 x => x % 5 == 1
等等
这是我得到的:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i);
}
return productionQueue;
}
然后我调用:
Random rnd = new Random();
ITargetBlock<int> temp = BuildPipeline(5);
while (true)
{
temp.Post(rnd.Next(255));
}
但是这不起作用。控制台中不显示任何输出。如果我修改 BuildPipeline
方法为:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num));
ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num));
ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num));
ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num));
ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num));
productionQueue.LinkTo(productionLine1, x => x % 5 == 0);
productionQueue.LinkTo(productionLine2, x => x % 5 == 1);
productionQueue.LinkTo(productionLine3, x => x % 5 == 2);
productionQueue.LinkTo(productionLine4, x => x % 5 == 3);
productionQueue.LinkTo(productionLine5, x => x % 5 == 4);
return productionQueue;
}
代码执行预期的操作。
有人可以阐明为什么动态创建和链接操作 block 不起作用吗?
附言如果我在 ITargetBlock<int> temp = BuildPipeline(5);
之后立即进入代码temp 确实显示有 5 个目标链接到缓冲区。而且每个target的Id都不一样。
提前致谢
编辑:添加了 svick 建议的更改,但仍然不好:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
for (int i = 0; i < NumProductionLines; i++)
{
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num));
int j = i;
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
}
ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num));
productionQueue.LinkTo(discardedLine);
return productionQueue;
}
现在只有第二条生产线处理数据(满足 x % 5 == 1 谓词的一条)。并且数据不满足谓词,这意味着我得到以 9 和 7 结尾的数字。
编辑:工作代码如下所示:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 };
for (int i = 0; i < NumProductionLines; i++)
{
int j = i;
ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num));
productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j);
}
productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
return productionQueue;
}
最佳答案
问题是在您的第一个版本中,您对每个目标 block 使用相同的谓词。换句话说,谓词不依赖于 i
。
但即使这样做了,您的代码也不会工作,因为 i
变量在谓词之间共享,因此它们都将使用最后一个值。解决方法是将 i
复制到局部变量中并在谓词中使用它。
代码可能是这样的:
private static ITargetBlock<int> BuildPipeline(int NumProductionLines)
{
var productionQueue = new BufferBlock<int>();
for (int i = 0; i < NumProductionLines; i++)
{
int iCopy = i;
ActionBlock<int> productionLine = new ActionBlock<int>(
num => Console.WriteLine("Processed by line {0}: {1}", iCopy + 1, num));
productionQueue.LinkTo(
productionLine, x => x % NumProductionLines == iCopy);
}
return productionQueue;
}
如果您问为什么您的代码不至少处理 x % 5 == 1
个数字,那是因为随机生成器可能会生成一个与该谓词不匹配的数字, 所以没有一个 ActionBlock
会接受它。因此,该号码将留在源 block 的输出队列中,其他号码将无法通过。
如果在您的实际代码中可能会发生类似情况,并且您想丢弃所有不符合任何谓词的数字,您可以将源代码块链接到 a block that does nothing并在将其链接到所有有用的 block 之后接受任何内容:
productionQueue.LinkTo(DataflowBlock.NullTarget<int>());
关于c# - 将动态创建的 ActionBlock 链接到 BufferBlock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12647185/
我目前正在使用 ActionBlock 来处理串行启动的异步作业。它可以很好地处理每个发布到它的项目,但无法从每个作业中收集结果列表。 我可以使用什么以线程安全的方式收集我的作业结果? 我的代码目前是
我目前正在使用 ActionBlock 来处理串行启动的异步作业。它可以很好地处理每个发布到它的项目,但无法从每个作业中收集结果列表。 我可以使用什么以线程安全的方式收集我的作业结果? 我的代码目前是
我有一个 ActionBlock,它只处理来自无限循环的连续不断的消息。在 ActionBlock 中,我发布了一个 http 帖子。当出现任何与网络相关的错误时,该方法会抛出异常并且该 block
我认为在测试方法中,“results”集合变量的类型必须是 BlockingCollection而不是 List .如果我错了,请证明给我看。我从 https://blog.stephencleary
我们在发送到 ActionBlock 时看到了意外行为,其中似乎正在发生并行处理,即使 MaxDegreeOfParallelism 为 1。情况如下。 发布到 ActionBlock 的类如下所示:
我需要并行处理某些项目,所以我使用 TPL Dataflow。 .要注意的是,共享相同键(类似于字典)的项目应按 FIFO 顺序处理,而不是彼此平行(它们可以与具有不同值的其他项目平行)。 正在完成的
我想实现优先级 ActionBlock .这样我就可以有条件地优先考虑一些TInput使用 Predicate 的项目. 我读了 Parallel Extensions Extras Samples和
我不确定这是否可行,但如果可行,我可能做得不对。假设我有一个链接到许多消费者(ActionBlocks)的共享缓冲区。每个消费者都应该消费满足用于将其链接到缓冲区的谓词的数据。例如,ActionBlo
我有一个带有 ActionBlock 的 Receiver 类: public class Receiver : IReceiver { private ActionBlock _receiver
我一直在尝试通过创建示例应用程序来理解 TPL 数据流。我一直在尝试做的一件事是从 ActionBlock 更新 TextBox 控件。使用 TPL Dataflow 的原因是在保持顺序的同时执行并行
我正在 .Net TPL 中构建一个管道,该管道经过一系列数据转换步骤,最终需要将输出写入文件。我正在考虑使用 ActionBlock 写入文件。但是,我不确定这是否会遇到任何问题......例如多个
我对 Framework 4.0 的 ActionBlock 实现很感兴趣,因为 Framework 4.0 似乎不支持 TPL.Dataflow。更具体地说,我对接收 Func 委托(delegat
我想知道并行执行多个异步方法的推荐方法是什么? 在 System.Threading.Tasks.Dataflow 中,我们可以指定最大并行度,但无界可能也是 Task.WhenAll 的默认设置?
我正在使用 TPL DataFlow 和 ActionBlock 来创建并行性。使用 TPL DataFlow 的原因是因为它支持异步性,除了我无法让它工作。 var ab = new ActionB
刚从 System.Threading.Tasks.Dataflow 开始,不确定我是否理解 ActionBlock 中未处理异常的正确错误处理技术。 我现在所拥有的导致挂起:- ActionBloc
我有这个代码: var data = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 1 }); var action = n
我是一名优秀的程序员,十分优秀!