gpt4 book ai didi

c# - 与多个消费者一起使用 BlockingCollection

转载 作者:太空狗 更新时间:2023-10-30 00:37:46 25 4
gpt4 key购买 nike

我有一个程序如下

 class Program
{
public static int TaskCount { get; set; }
public static BlockingCollection<string> queue = new BlockingCollection<string>(new ConcurrentQueue<string>());
static void Main(string[] args)
{
TaskCount = 3;
Task.Factory.StartNew(() => Producer());

for (int i = 0; i < TaskCount; i++)
Task.Factory.StartNew(() => Consumer());
Console.ReadKey();
}

private static void Producer()
{
using (StreamWriter sw = File.AppendText(@"C:\pcadder.txt"))
{
for (int i = 0; i < 15; i++)
{
queue.Add("Item: " + (i+1).ToString());
var message = string.Format("{2}.Item added: Item {0} at {1}", (i+1).ToString(), DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),i+1);
Console.WriteLine(message);
sw.WriteLine(message);

}
queue.CompleteAdding();
}
}
private static void Consumer()
{
int count = 1;
foreach (var item in queue.GetConsumingEnumerable())
{
var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
Thread.CurrentThread.ManagedThreadId,count);
Console.WriteLine(message);

using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
sw.WriteLine(message);
count += 1;
}
}
}

输出

1.Item added: Item 1 at 2017.07.06 09:58:49.784734
2.Item added: Item 2 at 2017.07.06 09:58:49.784734
3.Item added: Item 3 at 2017.07.06 09:58:49.784734
4.Item added: Item 4 at 2017.07.06 09:58:49.784734
5.Item added: Item 5 at 2017.07.06 09:58:49.784734
6.Item added: Item 6 at 2017.07.06 09:58:49.784734
7.Item added: Item 7 at 2017.07.06 09:58:49.784734
8.Item added: Item 8 at 2017.07.06 09:58:49.784734
9.Item added: Item 9 at 2017.07.06 09:58:49.784734
10.Item added: Item 10 at 2017.07.06 09:58:49.784734
11.Item added: Item 11 at 2017.07.06 09:58:49.784734
12.Item added: Item 12 at 2017.07.06 09:58:49.784734
13.Item added: Item 13 at 2017.07.06 09:58:49.784734
14.Item added: Item 14 at 2017.07.06 09:58:49.784734
15.Item added: Item 15 at 2017.07.06 09:58:49.784734

1.Item taken: Item: 3 at 2017.07.06 09:58:49.784734 by thread 7.
1.Item taken: Item: 2 at 2017.07.06 09:58:49.784734 by thread 4.
1.Item taken: Item: 1 at 2017.07.06 09:58:49.784734 by thread 5.
2.Item taken: Item: 5 at 2017.07.06 09:58:49.784734 by thread 4.
2.Item taken: Item: 4 at 2017.07.06 09:58:49.784734 by thread 7.
2.Item taken: Item: 6 at 2017.07.06 09:58:49.784734 by thread 5.
3.Item taken: Item: 7 at 2017.07.06 09:58:49.784734 by thread 4.
3.Item taken: Item: 8 at 2017.07.06 09:58:49.784734 by thread 7.
3.Item taken: Item: 9 at 2017.07.06 09:58:49.784734 by thread 5.
4.Item taken: Item: 11 at 2017.07.06 09:58:49.784734 by thread 7.
4.Item taken: Item: 12 at 2017.07.06 09:58:49.784734 by thread 5.
5.Item taken: Item: 13 at 2017.07.06 09:58:49.784734 by thread 7.
5.Item taken: Item: 14 at 2017.07.06 09:58:49.784734 by thread 5.
6.Item taken: Item: 15 at 2017.07.06 09:58:49.784734 by thread 7.

几乎每次运行该程序后,我都会在消费者日志中丢失一项。(此处,Item 10 丢失)。我不明白为什么会这样。

  1. 如何处理此项目?
  2. 作为消费者使用多个任务时,按顺序处理项目 (FIFO) 是否被破坏了?如果我想在消费者方法中保持/强制以 FIFO 顺序处理,我应该避免使用多个任务吗? (处理可能包括 I/O、网络操作)

最佳答案

这里

using (StreamWriter sw = File.AppendText(@"C:\pctaker.txt"))
sw.WriteLine(message);

您从多个线程快速写入同一个文件。这不是一个好主意,这段代码实际上会抛出一个异常。它在您的代码中不会被注意到,因为您不处理任何异常并且它发生在后台线程中,因此不会使您的应用程序崩溃。这回答了为什么您的日志中缺少项目。例如,您可以写入同一个文件:

// create it outside `Consumer` and make synchronized
using (var taker = TextWriter.Synchronized(File.AppendText(@"pctaker.txt"))) {
TaskCount = 3;
Task.Factory.StartNew(() => Producer());
//Producer();
for (int i = 0; i < TaskCount; i++)
// pass to consumer
Task.Factory.StartNew(() => Consumer(taker));
Console.ReadKey();
}

private static void Consumer(TextWriter writer)
{
int count = 1;
foreach (var item in queue.GetConsumingEnumerable())
{
var message = string.Format("{3}.Item taken: {0} at {1} by thread {2}.", item, DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss.ffffff"),
Thread.CurrentThread.ManagedThreadId, count);
Console.WriteLine(message);
writer.WriteLine(message);
writer.Flush();
count += 1;
}
}

或者只是在写入文件时放置一个

至于第二个问题——消费者仍然以 FIFO 顺序提取项目,但由于你有多个消费者——处理顺序当然不能保证,因为所有消费者并行处理项目。消费者A拉取元素1,消费者B同时拉取元素2。消费者 A 需要 100 毫秒来处理项目 1,消费者 B 需要 10 毫秒来处理项目 2。结果 - 在项目 1 之前处理项目 2(即 - 写入您的日志)。

关于c# - 与多个消费者一起使用 BlockingCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44942403/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com