gpt4 book ai didi

c# - 具有多线程或任务的进程队列

转载 作者:可可西里 更新时间:2023-11-01 08:39:38 26 4
gpt4 key购买 nike

我有一个电话消息应用程序,其中有很多消息需要处理。由于电话端口有限,所以消息将被先进先出处理。每条消息都有一个标志“Acknowledge”,指示已处理的消息。它当然被初始化为false。

我想将所有消息放入一个队列,然后用多个线程或任务处理它们。

    public class MessageQueue
{
public Queue MessageWorkItem { get; set; }
public Messages Message { get; set; }
public MessageQueue()
{
MessageWorkItem = new Queue();
Message = new Messages();
}
public void GetMessageMetaData()
{
try
{
// It is just a test, add only one item into the queue
Message.MessageID = Guid.NewGuid();
Message.NumberToCall = "1111111111";
Message.FacilityID = "3333";
Message.NumberToDial = "2222222222";
Message.CountryCode = "1";
Message.Acknowledge = false;
}
catch (Exception ex)
{
}
}

public void AddingItemToQueue()
{
GetMessageMetaData();
if (!Message.Acknowledge)
{
lock (MessageWorkItem)
{
MessageWorkItem.Enqueue(Message);
}
}
}
}

public class Messages
{
public Guid MessageID { get; set; }
public string NumberToCall { get; set; }
public string FacilityID { get; set; }
public string NumberToDial { get; set; }
public string CountryCode { get; set; }
public bool Acknowledge { get; set; }
}

现在我的问题是如何使用多线程从队列中取出项目。对于队列中的每个项目,我想运行一个脚本。

        public void RunScript(Message item)
{
try
{
PlayMessage(item);
return;
}
catch (HangupException hex)
{
Log.WriteWithId("Caller Hungup!", hex.Message);
}
catch (Exception ex)
{
Log.WriteException(ex, "Unexpected exception: {0}");
}
}

本来想的是看看有没有

if(MessageWorkItem.Count >= 1) Then doing something but I do need code help.

最佳答案

如果您可以使用 .Net 4.5,我建议您查看 Dataflow from the the Task Parallel Library (TPL) .

该页面引出了很多示例演练,例如 How to: Implement a Producer-Consumer Dataflow PatternWalkthrough: Using Dataflow in a Windows Forms Application .

查看该文档,看看它是否对您有帮助。要接受的内容很多,但我认为这可能是您最好的方法。

或者,您可以考虑使用 BlockingCollection连同其 GetConsumingEnumerable() 方法来访问队列中的项目。

您所做的是将工作拆分为您想要以某种方式处理的对象,并使用 BlockingCollection 来管理队列。

一些使用 int 而不是对象作为工作项的示例代码将有助于证明这一点:

当工作线程完成其当前项目时,它将从工作队列中删除一个新项目,处理该项目,然后将其添加到输出队列。

一个单独的消费者线程从输出队列中移除完成的项目并对它们做一些事情。

最后,我们必须等待所有 worker 完成 (Task.WaitAll(workers)),然后才能将输出队列标记为已完成 (outputQueue.CompleteAdding())。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
class Program
{
static void Main(string[] args)
{
new Program().run();
}

void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];

Task.Factory.StartNew(consumer);

for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}

for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Queueing work item {0}", i);
inputQueue.Add(i);
Thread.Sleep(50);
}

Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
Task.WaitAll(workers);
outputQueue.CompleteAdding();
Console.WriteLine("Done.");

Console.ReadLine();
}

void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);

foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
Thread.Sleep(100); // Simulate work.
outputQueue.Add(workItem); // Output completed item.
}

Console.WriteLine("Worker {0} is stopping.", workerId);
}

void consumer()
{
Console.WriteLine("Consumer is starting.");

foreach (var workItem in outputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Consumer is using item {0}", workItem);
Thread.Sleep(25);
}

Console.WriteLine("Consumer is finished.");
}

BlockingCollection<int> inputQueue = new BlockingCollection<int>();
BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
}

关于c# - 具有多线程或任务的进程队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22688679/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com