gpt4 book ai didi

c# - 为什么迭代 GetConsumingEnumerable() 没有完全清空底层阻塞集合

转载 作者:可可西里 更新时间:2023-11-01 02:58:57 32 4
gpt4 key购买 nike

我有一个使用任务并行库的可量化和可重复的问题,BlockingCollection<T> , ConcurrentQueue<T> & GetConsumingEnumerable在尝试创建一个简单的管道时。

简而言之,将条目添加到默认 BlockingCollection<T> (在引擎盖下依赖于 ConcurrentQueue<T> )来自一个线程,不保证它们会从 BlockingCollection<T> 中弹出。来自另一个调用 GetConsumingEnumerable() 的线程方法。

我创建了一个非常简单的 Winforms 应用程序来重现/模拟它,它只将整数打印到屏幕上。

  • Timer1负责排队工作项......它使用一个名为_tracker的并发字典以便它知道它已经添加到阻塞集合中的内容。
  • Timer2只是记录 BlockingCollection 的计数状态& 的 _tracker
  • START 按钮启动一个 Paralell.ForEach它简单地遍历阻塞集合 GetConsumingEnumerable()并开始将它们打印到第二个列表框。
  • 停止按钮停止 Timer1防止将更多条目添加到阻塞集合中。
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;

public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}

private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}

private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}

private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2

var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};

_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}

private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}

这是事件的顺序:

  • 按开始键
  • Timer1 计时 & ListBox1 立即更新 3 条消息(添加 0、1、2)
  • ListBox2 随后更新为 3 条消息,间隔 1 秒
    • 正在处理 0
    • 处理 1
    • 处理 2
  • Timer1 计时 & ListBox1 立即更新 3 条消息(Adding 3, 4, 5)
  • ListBox2 随后更新为 2 条消息,间隔 1 秒
    • 处理 3
    • 处理 4
    • Processing 5 未打印...似乎已“丢失”
  • 按“停止”以防止计时器 1 添加更多消息
  • 等等……“Processing 5”还是没有出现

Missing Entry

您可以看到并发字典仍在跟踪 1 个项目尚未处理并随后从 _tracker 中删除

如果我再次按下 Start,timer1 将开始添加另外 3 个条目,Parallel 循环将重新开始打印 5、6、7 和 8。

Entry returned after subsequent items shoved in behind it

我完全不知道为什么会这样。再次调用 start 显然调用了一个新任务,它调用了一个 Paralell foreach,并重新执行 GetConsumingEnumerable() 神奇地找到了丢失的条目......我

为什么是BlockingCollection.GetConsumingEnumerable()不保证迭代添加到集合中的每个项目。

为什么添加更多条目会导致它“脱离”并继续处理?

最佳答案

您不能在 Parallel.ForEach() 中使用 GetConsumingEnumerable()

使用 TPL extras 中的 GetConsumingPartitioner

在博客文章中,您还将获得为什么不能使用 GetConsumingEnumerable()

的解释

The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

即Parallel.ForEach 在继续之前等待直到它收到一组工作项。正是您的实验所显示的。

关于c# - 为什么迭代 GetConsumingEnumerable() 没有完全清空底层阻塞集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10208330/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com