- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个程序如下
class Program
{
public static int TaskCount { get; set; }
public static BlockingCollection<string> queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
static void Main(string[] args)
{
TaskCount = 3;
Task.Factory.StartNew(() => Producer());
for (int i = 0; i < TaskCount; i++)
Task.Factory.StartNew(() => Consumer());
Console.ReadKey();
}
private static void Producer()
{
using (StreamWriter sw = File.AppendText(@"C:\pcadder.txt"))
{
for (int i = 0; i < 15; i++)
{
queue.Add("Item: " + (i+1).ToString());
var message = string.Format("{2}.Item added: Item {0} at {1}", (i+1).ToString(), DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),i+1);
Console.WriteLine(message);
sw.WriteLine(message);
}
queue.CompleteAdding();
}
}
private static void Consumer()
{
int count = 1;
foreach (var item in queue.GetConsumingEnumerable())
{
var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
Thread.CurrentThread.ManagedThreadId,count);
Console.WriteLine(message);
using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
sw.WriteLine(message);
count += 1;
}
}
}
1.Item added: Item 1 at 2017.07.06 09:58:49.784734
2.Item added: Item 2 at 2017.07.06 09:58:49.784734
3.Item added: Item 3 at 2017.07.06 09:58:49.784734
4.Item added: Item 4 at 2017.07.06 09:58:49.784734
5.Item added: Item 5 at 2017.07.06 09:58:49.784734
6.Item added: Item 6 at 2017.07.06 09:58:49.784734
7.Item added: Item 7 at 2017.07.06 09:58:49.784734
8.Item added: Item 8 at 2017.07.06 09:58:49.784734
9.Item added: Item 9 at 2017.07.06 09:58:49.784734
10.Item added: Item 10 at 2017.07.06 09:58:49.784734
11.Item added: Item 11 at 2017.07.06 09:58:49.784734
12.Item added: Item 12 at 2017.07.06 09:58:49.784734
13.Item added: Item 13 at 2017.07.06 09:58:49.784734
14.Item added: Item 14 at 2017.07.06 09:58:49.784734
15.Item added: Item 15 at 2017.07.06 09:58:49.784734
1.Item taken: Item: 3 at 2017.07.06 09:58:49.784734 by thread 7.
1.Item taken: Item: 2 at 2017.07.06 09:58:49.784734 by thread 4.
1.Item taken: Item: 1 at 2017.07.06 09:58:49.784734 by thread 5.
2.Item taken: Item: 5 at 2017.07.06 09:58:49.784734 by thread 4.
2.Item taken: Item: 4 at 2017.07.06 09:58:49.784734 by thread 7.
2.Item taken: Item: 6 at 2017.07.06 09:58:49.784734 by thread 5.
3.Item taken: Item: 7 at 2017.07.06 09:58:49.784734 by thread 4.
3.Item taken: Item: 8 at 2017.07.06 09:58:49.784734 by thread 7.
3.Item taken: Item: 9 at 2017.07.06 09:58:49.784734 by thread 5.
4.Item taken: Item: 11 at 2017.07.06 09:58:49.784734 by thread 7.
4.Item taken: Item: 12 at 2017.07.06 09:58:49.784734 by thread 5.
5.Item taken: Item: 13 at 2017.07.06 09:58:49.784734 by thread 7.
5.Item taken: Item: 14 at 2017.07.06 09:58:49.784734 by thread 5.
6.Item taken: Item: 15 at 2017.07.06 09:58:49.784734 by thread 7.
几乎每次运行该程序后,我都会在消费者日志中丢失一项。(此处,Item 10
丢失)。我不明白为什么会这样。
最佳答案
这里
using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
sw.WriteLine(message);
您从多个线程快速写入同一个文件。这不是一个好主意,这段代码实际上会抛出一个异常。它在您的代码中不会被注意到,因为您不处理任何异常并且它发生在后台线程中,因此不会使您的应用程序崩溃。这回答了为什么您的日志中缺少项目。例如,您可以写入同一个文件:
// create it outside `Consumer` and make synchronized
using (var taker = TextWriter.Synchronized(File.AppendText(@"pctaker.txt"))) {
TaskCount = 3;
Task.Factory.StartNew(() => Producer());
//Producer();
for (int i = 0; i < TaskCount; i++)
// pass to consumer
Task.Factory.StartNew(() => Consumer(taker));
Console.ReadKey();
}
private static void Consumer(TextWriter writer)
{
int count = 1;
foreach (var item in queue.GetConsumingEnumerable())
{
var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
Thread.CurrentThread.ManagedThreadId, count);
Console.WriteLine(message);
writer.WriteLine(message);
writer.Flush();
count += 1;
}
}
或者只是在写入文件时放置一个锁
。
至于第二个问题——消费者仍然以 FIFO 顺序提取项目,但由于你有多个消费者——处理顺序当然不能保证,因为所有消费者并行处理项目。消费者A拉取元素1,消费者B同时拉取元素2。消费者 A 需要 100 毫秒来处理项目 1,消费者 B 需要 10 毫秒来处理项目 2。结果 - 在项目 1 之前处理项目 2(即 - 写入您的日志)。
关于c# - 与多个消费者一起使用 BlockingCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44942403/
我有两个阻塞集合 - 一个优先级高于另一个。如果我使用 TryTakeFromAny 并首先指定更高优先级的 BlockingCollection,是否可以保证首先查看更高优先级的队列? 最佳答案 这
我有一个使用 BlockingCollection<> 实现的下载队列.现在我想偶尔优先考虑一些下载。我认为将某些元素“向上”移动到集合中可能会很棒,例如在列表中,但没有像 Remove()/AddF
BlockingCollection.Dispose实际上做什么? 最佳答案 这样可以正确处理内部等待 handle 。 BlockingCollection在内部使用一对事件等待句柄,它们又具有关联
BlockingCollection blockingCollection = new BlockingCollection(); // create and start a producer
将来自多个线程的消息放到一个队列中并让一个单独的线程一次处理一个队列中的项目的最佳方法是什么? 在尝试断开多个线程的事件时,我经常使用这种模式。 我正在为此使用 BlockingCollection,
我正在尝试正确建模多线程单生产者/多消费者场景,其中消费者可以要求生产者获取元素,但生产者需要执行耗时的操作来生产它(想想执行查询或打印文档)。 我的目标是确保没有消费者可以同时要求生产者生产一件商品
我知道使用 ConcurrentQueue 的 BlockingCollection 的 boundedcapacity 为 100。 但是我不确定那是什么意思。 我正在尝试实现一个并发缓存,如果队列
BlockingCollection 仅包含添加单个项目的方法。如果我想添加一个集合怎么办?我应该只使用 foreach 循环吗? 为什么 BlockingCollection 不包含添加集合的方法?
我正在编写一个 WCF 服务,它从多个模块(数据库、其他服务..)接收通知并将它们添加到阻塞集合中,以便在将相关数据发布到客户端的使用者线程上进行处理。 客户端可以请求存储在服务器上的完整数据,在此操
我有这样一种情况,我需要有大量(数百个)队列,其中的项目应该按顺序处理(需要单线程消费者)。我的第一个实现,based on the samples,我为每个 BlockingCollection 使
Stephen Toub 的书第 88 页 http://www.microsoft.com/download/en/details.aspx?id=19222 有代码 private Blockin
我即将使用如下所示的 BlockingCollection,只是想检查它是否适合线程安全等。想知道我是否需要 CancellationTokenSource。 谢谢 public class MyAp
你好,我正在尝试创建一个使用流数据的应用程序......(所以没有结束......完成......等) 由于它的性质,因为流数据得到了很多操作,我使用 BlockingCollection,它工作得很
我有一个 BlockingCollection .生产者任务向其中添加项目,消费者任务移除项目。 现在我想限制集合中的项目数量,如果添加了更多项目,则自动丢弃旧数据。该集合不应同时包含超过 N 个最近
我有一个线程将项目添加到 BlockingCollection 。 在我正在使用的另一个线程上foreach(myCollection.GetConsumingEnumerable() 中的 var
我有一个程序如下 class Program { public static int TaskCount { get; set; } public stati
我有以下代码,其中包含一个生产者线程和多个消费者线程。你知道多个消费者是否是线程安全的。例如,线程 1 是否有可能正在消费,而线程 2 是否并行消费并更改线程 1 中使用的项目的值? namespac
我需要建立一个阻塞优先级队列,我的预感是 TakeFromAny 可能是 secret 成分,但是关于该方法的文档很少。它的目的/适当用途是什么? 我的要求是多个线程将添加到高优先级或低优先级队列中。
有没有办法删除 具体 BlockingCollection 中的项目,如下所示: IMyItem mySpecificItem = controller.getTopRequestedItem();
我试图在.NET 4上新的Parallel Stacks的背景下理解BlockingCollection的目的。 MSDN文档说: BlockingCollection用作IProducerConsu
我是一名优秀的程序员,十分优秀!