gpt4 book ai didi

c# - BlockingCollection.TakeFromAny,用于具有不同泛型类型的集合

转载 作者:太空宇宙 更新时间:2023-11-03 12:36:40 25 4
gpt4 key购买 nike

有一个 BlockingCollection<T>.TakeFromAny .NET 中的方法。它首先尝试快速获取 Take,然后默认为等待底层句柄的“慢速”方法。我想用它来听取上游生产者提供“消息”和下游生产者提供“结果”。

  • 是否可以使用 TakeFromAny(或者是否有其他方式,无需重新实现此类)来监听对异构类型阻塞集合集合的添加?

以下代码不是类型有效的,自然无法编译:

object anyValue;
var collection = new List<BlockingCollection<object>>();
// following fails: cannot convert
// from 'System.Collections.Concurrent.BlockingCollection<Message>'
// to 'System.Collections.Concurrent.BlockingCollection<object>'
collection.Add(new BlockingCollection<Message>());
// fails for same reason
collection.Add(new BlockingCollection<Result>());
BlockingCollection<object>.TakeFromAny(collection.ToArray(), out anyValue);

可能只处理 new BlockingCollection<object>()实例并在 Take 上进行转换以避免编译类型错误,尽管这让我感到不对(呃)——尤其是因为通过方法接口(interface)丢失了类型。使用包装组合类型可以解决后者; fsvo '解决'。


下面的内容与问题没有直接关系,尽管它为感兴趣的人提供了上下文。提供核心基础设施功能的代码不可使用更高级别的构造(例如 Rx 或 TPL 数据流)。

这是一个基本的流程模型。生产者、代理和工作人员在不同的线程上运行(工作人员可以在同一线程上运行,具体取决于任务调度程序的作用)。

[producer]   message -->   [proxy]   message --> [worker 1]
<-- results <-- results
message --> [worker N..]
<-- results

期望代理监听消息(传入)和结果(返回)。代理做一些转换和分组等工作,并将结果作为反馈。

将代理作为一个单独的线程将其与执行各种猴子业务的初始生产源隔离开来。工作任务是为了并行性,而不是异步性,线程(在通过代理中的分组减少/消除争用之后)应该允许良好的扩展。

队列是在代理和工作人员之间建立的(而不是具有单一输入/结果的直接任务),因为当工作人员正在执行时,可能有额外的传入工作消息可以在它结束之前处理。这是为了确保工作人员可以延长/重用它在相关工作流中建立的上下文。

最佳答案

我认为这里最好的选择是将两个阻塞集合的类型更改为 BlockingCollection<object> ,您已经提到过,包括它的缺点。

如果您不能或不想这样做,另一种解决方案是合并 BlockingCollection<object>以及每个源集合的线程,将项目从其集合移动到合并的集合:

var producerCollection = new BlockingCollection<Message>();
var consumerCollection = new BlockingCollection<Results>();

var combinedCollection = new BlockingCollection<object>();

var producerCombiner = Task.Run(() =>
{
foreach (var item in producerCollection.GetConsumingEnumerable())
{
combinedCollection.Add(item);
}
});

var consumerCombiner = Task.Run(() =>
{
foreach (var item in consumerCollection.GetConsumingEnumerable())
{
combinedCollection.Add(item);
}
});

Task.WhenAll(producerCombiner, consumerCombiner)
.ContinueWith(_ => combinedCollection.CompleteAdding());

foreach (var item in combinedCollection.GetConsumingEnumerable())
{
// process item here
}

它不是很有效,因为它只是为了执行此操作而阻塞了两个额外的线程,但这是我能想到的最好的替代方法,无需使用反射来访问 TakeFromAny 使用的句柄。 .

关于c# - BlockingCollection<T>.TakeFromAny,用于具有不同泛型类型的集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40688168/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com