gpt4 book ai didi

multithreading - 尝试将 AddMessage 批量添加到 Azure 队列

转载 作者:行者123 更新时间:2023-12-04 21:57:09 24 4
gpt4 key购买 nike

我有大约 50K 条消息想要添加到 Azure 队列中。

我不确定我的代码是否安全。感觉/闻起来很糟糕。

基本上,给出 POCO 的集合,将 POCO 序列化为某个 json,然后将该 json 文本添加到队列中。

public void AddMessage(T content)
{
content.ShouldNotBe(null);

var json = JsonConvert.SerializeObject(content);
var message = new CloudQueueMessage(json);
Queue.AddMessage(message);
}

public void AddMessages(ICollection<T> contents)
{
contents.ShouldNotBe(null);
Parallel.ForEach(contents, AddMessage);
}

有人可以告诉我应该做什么来解决这个问题 - 最重要的是,为什么?

我觉得在这种情况下队列可能不是线程安全的。

最佳答案

我观察到的关于 Parallel.ForEach 和处理 Azure 存储的一些事情(我的经验是并行上传 blob/ block ):

  • Azure 存储操作是基于网络 (IO) 的操作,而不是处理器密集型操作。如果我没记错的话,Parallel.ForEach 更适合处理器密集型应用程序。
  • 使用 Parallel.ForEach 上传大量 blob(或 block )时我们注意到的另一件事是,我们开始出现大量 Timeout 异常,并且速度实际上减慢了整个操作下来。我相信这样做的原因是,当您使用这种方法迭代包含大量项目的集合时,您实际上是在处理对底层框架的控制,该框架决定如何处理该集合。在这种情况下,将发生大量上下文切换,这会减慢操作速度。考虑到有效负载较小,不确定这在您的场景中如何工作。

我的建议是让应用程序控制它可以生成的并行线程的数量。一个好的标准是逻辑处理器的数量。另一个好的标准是 IE 可以打开的端口数量。所以你会产生那么多数量的并行线程。然后,您可以等待所有线程完成以生成下一组并行线程,或者在一个任务完成后立即启动一个新线程。

伪代码:

    ICollection<string> messageContents;
private void AddMessages()
{
int maxParallelThreads = Math.Min(Environment.ProcessorCount, messageContents.Count);
if (maxParallelThreads > 0)
{
var itemsToAdd = messageContents.Take(maxParallelThreads);
List<Task> tasks = new List<Task>();
for (var i = 0; i < maxParallelThreads; i++)
{
tasks.Add(Task.Factory.StartNew(() =>
{
AddMessage(itemsToAdd[i]);
RemoveItemFromCollection();
}));
}
Task.WaitAll(tasks.ToArray());
AddMessages();
}
}

关于multithreading - 尝试将 AddMessage 批量添加到 Azure 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22903312/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com