- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我有一个使用任务并行库的可量化和可重复的问题,BlockingCollection<T>
, ConcurrentQueue<T>
& GetConsumingEnumerable
在尝试创建一个简单的管道时。
简而言之,将条目添加到默认 BlockingCollection<T>
(在引擎盖下依赖于 ConcurrentQueue<T>
)来自一个线程,不保证它们会从 BlockingCollection<T>
中弹出。来自另一个调用 GetConsumingEnumerable()
的线程方法。
我创建了一个非常简单的 Winforms 应用程序来重现/模拟它,它只将整数打印到屏幕上。
Timer1
负责排队工作项......它使用一个名为_tracker
的并发字典以便它知道它已经添加到阻塞集合中的内容。Timer2
只是记录 BlockingCollection
的计数状态& 的 _tracker
Paralell.ForEach
它简单地遍历阻塞集合 GetConsumingEnumerable()
并开始将它们打印到第二个列表框。Timer1
防止将更多条目添加到阻塞集合中。public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
这是事件的顺序:
您可以看到并发字典仍在跟踪 1 个项目尚未处理并随后从 _tracker
中删除
如果我再次按下 Start,timer1 将开始添加另外 3 个条目,Parallel 循环将重新开始打印 5、6、7 和 8。
我完全不知道为什么会这样。再次调用 start 显然调用了一个新任务,它调用了一个 Paralell foreach,并重新执行 GetConsumingEnumerable() 神奇地找到了丢失的条目......我
为什么是BlockingCollection.GetConsumingEnumerable()
不保证迭代添加到集合中的每个项目。
为什么添加更多条目会导致它“脱离”并继续处理?
最佳答案
您不能在 Parallel.ForEach()
中使用 GetConsumingEnumerable()
。
使用 TPL extras 中的 GetConsumingPartitioner
在博客文章中,您还将获得为什么不能使用 GetConsumingEnumerable()
The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.
即Parallel.ForEach 在继续之前等待直到它收到一组工作项。正是您的实验所显示的。
关于c# - 为什么迭代 GetConsumingEnumerable() 没有完全清空底层阻塞集合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10208330/
解决方案 1 和 2 之间的区别是什么,_taskQ 是 BlockingCollection,我正在尝试实现生产者-消费者场景。 BlockingCollection 使用默认的 Concurren
我有一个使用任务并行库的可量化和可重复的问题,BlockingCollection , ConcurrentQueue & GetConsumingEnumerable在尝试创建一个简单的管道时。 简
有人可以确认下面的代码确实从 BlockingCollection 中删除了项目 foreach (var item in myCollection.GetConsumingEnumerable())
MSDN 评论 http://msdn.microsoft.com/en-us/library/dd267312.aspx声明... “BlockingCollection 的默认集合类型是 Conc
我有一个进程生成工作,第二个进程带有 BlockingCollection<>那消耗了那份工作。当我关闭我的程序时,我需要我的消费者停止消费工作,但我仍然需要快速记录未决但尚未消费的工作。 现在,我的
在下面的代码中,我使用 CancellationToken 在生产者不生产时唤醒 GetConsumingEnumerable() 并且我想脱离 foreach 并退出任务。但我没有看到 IsCanc
我有多个生产者和多个消费者的情况。生产者将作业放入队列中。我选择了 BlockingCollection,它工作得很好,因为我需要消费者等待找到工作。但是,如果我使用 GetConsumingEnum
我正在使用 BlockingCollection 来实现任务调度程序,基本上: public class DedicatedThreadScheduler : TaskScheduler, IDisp
我正在尝试通过 WAN 对多个目标执行并行 SqlBulkCopy,其中许多目标可能连接速度较慢和/或连接中断;它们的连接速度从 2 到 50 兆位下载不等,我正在通过 1000 兆位上传的连接发送;
我已经对这两个循环进行了试验,并开始注意到,即使任务的 Action 委托(delegate)中的常规 foreach 循环应该并行执行,它也不会并行处理元素。但是,如果我用 Parallel.For
我是一名优秀的程序员,十分优秀!