- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是一个简化的场景 - 用户想要下载并处理一些数据:
private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
foreach (var (id, path, name) in properties)
{
_testDictionary.TryAdd(id, (path, name));
}
await CreatePipeline(properties);
//after returning I would like to check if _testDictionary contains any elements,
//and what is their status
}
所有传入的项目都在ConcurrentDictionary
中注册,然后调用TPL Dataflow管道进行下载和处理:
public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => { return data.id; },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
var resultsBlock = new ActionBlock<int>((data) =>
{
_testDictionary.TryRemove(data, out _);
//or
//_testDictionary.AddOrUpdate(...);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
downloadBlock.LinkTo(resultsBlock,
new DataflowLinkOptions { PropagateCompletion = true });
foreach (var item in properties)
{
await downloadBlock.SendAsync(item);
}
resultsBlock.Complete();
await resultsBlock.Completion;
}
最后,结果 block 中的项目将根据其进行情况从 _testDictionary
中删除(或更新)。我愚蠢的问题是 - 如果我为创建管道的所有 block 设置 MaxDegreeOfParallelism = 1 并确保不会有多个管道同时运行,我真的需要 ConcurrentDictionary
对于这个或简单的 Dictionary
就足够了吗?我担心管道可以在不同的线程上执行,并且从那里访问简单的 Dictionary
可能会导致问题。
最佳答案
正如我所见,您的 StartDownload
尝试充当生产者,而您的 CreatePipeline
尝试充当消费者 _testDictionary
的观点。 Add
和 Remove
调用分为两个不同的函数,这就是您需要将该变量设置为类级别的原因。
如果 CreatePipeline
包含两个调用并且返回所有未处理的元素怎么办?
public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));
// var downloadBlock = ...;
var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);
//...
downloadBlock.Complete();
await resultsBlock.Completion;
return unprocessed.ToDictionary(
dict => dict.Key,
dict => dict.Value);
}
如果顺序并不重要,那么您可以考虑重写 TransformBlock
填充逻辑,如下所示:
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
如果您想确保返回的未处理项目不能被其他线程修改,那么您可以利用ImmutableDictionary .
所以,如果我们把所有东西放在一起,它可能看起来像这样:
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
var unprocessedProperties = await CreatePipeline(properties);
foreach (var property in unprocessedProperties)
{
//TODO
}
}
public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id, options);
var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);
downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));
downloadBlock.Complete();
await resultsBlock.Completion;
return unprocessed.ToImmutableDictionary(
dict => dict.Key,
dict => dict.Value);
}
编辑:反射(reflect)新的新要求
正如OP指出的,字典背后的主要原因是提供在处理仍在进行时扩展待处理队列的能力。
换句话说,待处理元素的处理和收集不是一次性的事情,而是一项持续的事件。
好处是您可以完全摆脱 _testDictionary
和 resultsBlock
。您所需要做的就是不断Post
或Send
新数据到TransformBlock
。在单独的方法 (StopDownload
) 中等待处理。
private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
public MyAwesomeClass()
{
downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
}
public void StartDownload(List<(int id, string path, string name)> properties)
{
//Starts to send props, but does not await them
_ = properties.Select(downloadBlock.SendAsync).ToList();
//You can await the send operation if you wish
}
public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}
可以轻松修改此结构以注入(inject) BufferBlock
来平滑负载:
private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;
public MyAwesomeBufferedClass()
{
var transform = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});
var buffer = new BufferBlock<(int id, string path, string name)>(
new DataflowBlockOptions() { BoundedCapacity = 100});
buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
downloadBlock = buffer;
}
public void StartDownload(List<(int id, string path, string name)> properties)
{
_ = properties.Select(downloadBlock.SendAsync).ToList();
}
public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}
关于c# - 我应该选择简单的 Dictionary 还是 ConcurrentDictionary 来处理任务并行库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62794259/
我在我的网络应用程序中创建了一个 ConcurrrentDictionary 作为应用程序对象。它在 session 之间共享。 (基本上用作存储库。) 有时,任何可用的 session 都会将新项目
ConcurrentDictionary.TryUpdate 方法需要与具有指定键的元素的值进行比较的 comparisonValue。但是如果我想做这样的事情: if (!_store.TryGet
如果我有一个 ConcurrentDictionary例如,我是否使用 Count 有关系吗?属性(property)或LINQ的Any() ?我宁愿写 dict.Any()而不是 dict.Coun
我正在使用并发字典来存储大约200万条记录,并且想知道如何初始化字典的并发级别。 MSDN页面的示例代码中具有以下注释: concurrencyLevel越高,可以在ConcurrentDiction
我正在使用 ConcurrentDictionary 来存储日志行,当我需要向用户显示它们时,我调用 ToList()生成一个列表。但奇怪的是,一些用户在列表中最先收到最近的行,而从逻辑上讲,他们应该
我发现这个 C# 扩展将 GetOrAdd 转换为 Lazy,我想对 AddOrUpdate 做同样的事情。 有人可以帮我将其转换为 AddOrUpdate 吗? public static cla
我对并发字典如何锁定他们的资源感到困惑。 例如,如果我运行一个方法来迭代字典中的每个项目并在一个线程中编辑它的值,然后我尝试从另一个线程读取一个键的值: 第二个线程会访问字典的快照吗? 如果没有,如果
我找不到有关 ConcurrentDictionary 类型的足够信息,因此我想在这里询问一下。 目前,我使用字典来保存由多个线程(来自线程池,因此没有确切数量的线程)不断访问的所有用户,并且它具有同
我在 Azure 应用服务上使用 MVC 5、ASP.NET 4.7 我正在使用 ConcurrentDictionary 对象来保存数据,以保存对数据源的多次调用。 关于其行为的几个问题: 1) 填
我想使用 ConcurrentDictionary 来检查之前是否添加了这个数据键,但看起来我仍然可以添加之前添加的键。 代码: public class pKeys {
我正在按照教程构建聊天客户端和服务器,现在我遇到了以下错误: Inconsistent accessibility: field type 'System.Collections.Concurrent
我正在考虑将类(单例)与使用 ConcurrentDictionary 实现的集合一起使用。此类将用作缓存实现 (asp.net/wcf)。 您如何看待从此类中显式公开这些集合与仅公开例如3 种方法(
有一个并发字典,它从不同的来源收集信息,每分钟刷新一次并将收集的数据传递给另一个处理程序。 var currentDictionarySnapshot = _currentDictionary; _c
我在 static 类中有一个静态 ConcurrentDictionary。在该类的静态构造函数中,我通过 Task.Run 调用一个私有(private)方法来无限循环遍历字典并删除已过期的项目,
我在 StackOverflow 上阅读了以下文章:ConcurrentBag - Add Multiple Items?和 Concurrent Dictionary Correct Usage但答
我存储这个类 public class Customer { public string Firstname { get; set; } public string Lastname
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 关于您编写的代码问题的问题必须在问题本身中描述具体问题 — 并且包括有效代码 以重现它。参见 SSC
我正在编写一些使用反射的代码。所以我试图将昂贵的反射处理缓存到 ConcurrentDictionary 中。但是,我想对并发字典应用限制,以防止存储旧的和未使用的缓存值。 我做了一些研究以了解如何限
好吧,所以我遇到了一个奇怪的小问题,坦率地说,我没有想法。我想把它扔出去看看我是否遗漏了我做错的东西,或者 ConcurrentDictionary 是否工作不正常。这是代码: (缓存是一个包含静态
我正在尝试使用 ConcurrentDictionary 来帮助完成过滤任务。 如果一个数字出现在列表中,那么我想将一个条目从一个字典复制到另一个。 但这部分的 AddOrUpdate 是不对的 -
我是一名优秀的程序员,十分优秀!