gpt4 book ai didi

c# - 我应该选择简单的 Dictionary 还是 ConcurrentDictionary 来处理任务并行库

转载 作者:行者123 更新时间:2023-12-02 02:46:01 25 4
gpt4 key购买 nike

这是一个简化的场景 - 用户想要下载并处理一些数据:

private ConcurrentDictionary<int, (string path, string name)> _testDictionary;
public async Task StartDownload(List<(int id, string path, string name)> properties)
{
foreach (var (id, path, name) in properties)
{
_testDictionary.TryAdd(id, (path, name));
}
await CreatePipeline(properties);
//after returning I would like to check if _testDictionary contains any elements,
//and what is their status
}

所有传入的项目都在ConcurrentDictionary中注册,然后调用TPL Dataflow管道进行下载和处理:

public async Task CreatePipeline(List<(int id, string path, string name)> properties)
{
var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => { return data.id; },
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
var resultsBlock = new ActionBlock<int>((data) =>
{
_testDictionary.TryRemove(data, out _);
//or
//_testDictionary.AddOrUpdate(...);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
downloadBlock.LinkTo(resultsBlock,
new DataflowLinkOptions { PropagateCompletion = true });

foreach (var item in properties)
{
await downloadBlock.SendAsync(item);
}
resultsBlock.Complete();
await resultsBlock.Completion;
}

最后,结果 block 中的项目将根据其进行情况从 _testDictionary 中删除(或更新)。我愚蠢的问题是 - 如果我为创建管道的所有 block 设置 MaxDegreeOfParallelism = 1 并确保不会有多个管道同时运行,我真的需要 ConcurrentDictionary 对于这个或简单的 Dictionary 就足够了吗?我担心管道可以在不同的线程上执行,并且从那里访问简单的 Dictionary 可能会导致问题。

最佳答案

数据流

正如我所见,您的 StartDownload 尝试充当生产者,而您的 CreatePipeline 尝试充当消费者 _testDictionary 的观点。 AddRemove 调用分为两个不同的函数,这就是您需要将该变量设置为类级别的原因。

如果 CreatePipeline 包含两个调用并且返回所有未处理的元素怎么办?

public async Task<Dictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));

// var downloadBlock = ...;

var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);

//...

downloadBlock.Complete();
await resultsBlock.Completion;

return unprocessed.ToDictionary(
dict => dict.Key,
dict => dict.Value);
}

订购

如果顺序并不重要,那么您可以考虑重写 TransformBlock 填充逻辑,如下所示:

await Task.WhenAll(properties.Select(downloadBlock.SendAsync));

不可变字典

如果您想确保返回的未处理项目不能被其他线程修改,那么您可以利用ImmutableDictionary .

所以,如果我们把所有东西放在一起,它可能看起来像这样:

public async Task StartDownload(List<(int id, string path, string name)> properties)
{
var unprocessedProperties = await CreatePipeline(properties);
foreach (var property in unprocessedProperties)
{
//TODO
}
}

public async Task<ImmutableDictionary<int, (string path, string name)>> CreatePipeline(List<(int id, string path, string name)> properties)
{
var options = new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1};

var unprocessed = new ConcurrentDictionary<int, (string path, string name)>(
properties.ToDictionary(
prop => prop.id,
prop => (prop.path, prop.name)));

var downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id, options);

var resultsBlock = new ActionBlock<int>(
(data) => unprocessed.TryRemove(data, out _), options);

downloadBlock.LinkTo(resultsBlock, new DataflowLinkOptions { PropagateCompletion = true });
await Task.WhenAll(properties.Select(downloadBlock.SendAsync));

downloadBlock.Complete();
await resultsBlock.Completion;

return unprocessed.ToImmutableDictionary(
dict => dict.Key,
dict => dict.Value);
}

编辑:反射(reflect)新的新要求

正如OP指出的,字典背后的主要原因是提供在处理仍在进行时扩展待处理队列的能力。

换句话说,待处理元素的处理和收集不是一次性的事情,而是一项持续的事件。

好处是您可以完全摆脱 _testDictionaryresultsBlock 。您所需要做的就是不断PostSend新数据到TransformBlock。在单独的方法 (StopDownload) 中等待处理。

private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;

public MyAwesomeClass()
{
downloadBlock = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
}

public void StartDownload(List<(int id, string path, string name)> properties)
{
//Starts to send props, but does not await them
_ = properties.Select(downloadBlock.SendAsync).ToList();

//You can await the send operation if you wish
}

public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}

可以轻松修改此结构以注入(inject) BufferBlock 来平滑负载:

private readonly ITargetBlock<(int id, string path, string name)> downloadBlock;

public MyAwesomeBufferedClass()
{
var transform = new TransformBlock<(int id, string path, string name), int>(
(data) => data.id,
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1});

var buffer = new BufferBlock<(int id, string path, string name)>(
new DataflowBlockOptions() { BoundedCapacity = 100});

buffer.LinkTo(transform, new DataflowLinkOptions {PropagateCompletion = true});
downloadBlock = buffer;
}

public void StartDownload(List<(int id, string path, string name)> properties)
{
_ = properties.Select(downloadBlock.SendAsync).ToList();
}

public async Task StopDownload()
{
downloadBlock.Complete();
await downloadBlock.Completion;
}

关于c# - 我应该选择简单的 Dictionary 还是 ConcurrentDictionary 来处理任务并行库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62794259/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com