gpt4 book ai didi

c# - 使用 GetConsumingEnumerable() 在 C# BlockingCollection 的某处丢失项目

转载 作者:行者123 更新时间:2023-11-30 17:25:30 28 4
gpt4 key购买 nike

我正在尝试通过 WAN 对多个目标执行并行 SqlBulkCopy,其中许多目标可能连接速度较慢和/或连接中断;它们的连接速度从 2 到 50 兆位下载不等,我正在通过 1000 兆位上传的连接发送;许多目标需要多次重试才能正确完成。

我目前正在使用 Parallel.ForEachGetConsumingEnumerable() 上BlockingCollection ( queue );但是我要么偶然发现了一些错误,要么我在完全理解它的目的时遇到了问题,或者只是出了点问题..代码从不调用 CompleteAdding()阻塞收集的方法,似乎在 parallel-foreach-loop 的某个地方丢失了一些目标。即使有不同的方法来解决这个问题,并且忽略它在循环中所做的工作类型,blockingcollection 不应该像它在这个例子中那样表现,不是吗?

在 foreach 循环中,我完成工作,并将目标添加到 results -如果成功完成则收集,或者在出现错误时将目标重新添加到 BlockingCollection,直到目标达到最大重试阈值;那时我将它添加到 results -收藏。

在另一个任务中,我循环直到 results 的计数-collection 等于目标的初始计数;然后我做 CompleteAdding()关于阻塞集合。

我已经尝试在 results 上使用锁定对象进行操作-collection(使用 List<int> 代替)和队列,没有运气,但无论如何都没有必要。我还尝试将重试添加到单独的集合中,然后将它们重新添加到不同任务中的 BlockingCollection 而不是 parallel.foreach 中。只是为了好玩,我还尝试使用 .NET 从 4.5 到 4.8 以及不同的 C# 语言版本进行编译。

这是一个简化的例子:

List<int> targets = new List<int>();
for (int i = 0; i < 200; i++)
{
targets.Add(0);
}

BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
ConcurrentBag<int> results = new ConcurrentBag<int>();
targets.ForEach(f => queue.Add(f));

// Bulkcopy in die Filialen:
Task.Run(() =>
{
while (results.Count < targets.Count)
{
Thread.Sleep(2000);
Console.WriteLine($"Completed: {results.Count} / {targets.Count} | queue: {queue.Count}");
}
queue.CompleteAdding();
});

int MAX_RETRIES = 10;
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 50 };

Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
try
{
// simulate a problem with the bulkcopy:
throw new Exception();
results.Add(target);
}
catch (Exception)
{
if (target < MAX_RETRIES)
{
target++;
if (!queue.TryAdd(target))
Console.WriteLine($"{target.ToString("D3")}: Error, can't add to queue!");
}
else
{
results.Add(target);
Console.WriteLine($"Aborted after {target + 1} tries | {results.Count} / {targets.Count} items finished.");
}

}
});

我预计 results 的计数-collection 是 targets 的精确计数-list 最后,但它似乎永远不会达到那个数字,这导致 BlockingCollection 永远不会被标记为已完成,因此代码永远不会完成。

我真的不明白为什么不是所有目标都添加到 results -最终收集!添加的计数总是变化的,并且大多只是略低于预期的最终计数。

编辑:我删除了重试部分,并用一个简单的 int 计数器替换了 ConcurrentBag,但大部分时间它仍然不起作用:

List<int> targets = new List<int>();
for (int i = 0; i < 500; i++)
targets.Add(0);

BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
//ConcurrentBag<int> results = new ConcurrentBag<int>();
int completed = 0;
targets.ForEach(f => queue.Add(f));

var thread = new Thread(() =>
{
while (completed < targets.Count)
{
Thread.Sleep(2000);
Console.WriteLine($"Completed: {completed} / {targets.Count} | queue: {queue.Count}");
}
queue.CompleteAdding();
});
thread.Start();

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
Interlocked.Increment(ref completed);
});

最佳答案

抱歉,找到了答案:blockingcollection 和 parallel foreach 使用的默认分区器是分块和缓冲,这导致 foreach 循环永远等待下一个 block 的足够项目。对我来说,它在那里坐了整整晚上,不处理最后几项!

所以,而不是:

ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
Interlocked.Increment(ref completed);
});

你必须使用:

var partitioner = Partitioner.Create(queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(partitioner, options, target =>
{
Interlocked.Increment(ref completed);
});

关于c# - 使用 GetConsumingEnumerable() 在 C# BlockingCollection 的某处丢失项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58485027/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com