gpt4 book ai didi

c# - 如何使用任务工厂处理队列

转载 作者:行者123 更新时间:2023-12-03 19:08:44 27 4
gpt4 key购买 nike

有一个队列。有一个函数可以处理来自该队列的消息。该函数从队列中取出消息,启动新任务处理下一条消息,等待其他来源的数据,然后进行计算。

这是例子

using System;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TestTaskFactory
{
class Program
{
static int Data = 50;
static int ActiveTasksNumber = 0;
static int MaxActiveTasksNumber = 0;

static Stopwatch clock = new Stopwatch();

static object locker = new object();
static object locker2 = new object();

static void Main(string[] args)
{
clock.Start();
Task.Factory.StartNew(() => DoWork());

while (true)
{
Thread.Sleep(10000);
}
}

public static void DoWork()
{
//imitation of geting message from some queue
int message = GetMessageFromQueue();

lock (locker2)
{
ActiveTasksNumber++;
MaxActiveTasksNumber = Math.Max(MaxActiveTasksNumber,
ActiveTasksNumber);
Console.Write("\r" + message + " ");
}

//Run new task to work with next message
Task.Factory.StartNew(() => DoWork());

//imitation wait some other data
Thread.Sleep(3000);

//imitation of calculations with message
int tmp = 0;

for (int i = 0; i < 30000000; i++)
{
tmp = Math.Max(message, i);
}

lock (locker2)
{
ActiveTasksNumber--;
}
}

public static int GetMessageFromQueue()
{
lock (locker)
{
if (Data == 0)
{
//Queue is empty. All tasks completed except one
//that is waiting for new data
clock.Stop();

Console.WriteLine("\rMax active tasks number = "
+ MaxActiveTasksNumber
+ "\tTime = " + clock.ElapsedMilliseconds + "ms");
Console.Write("Press key to run next iteration");

clock.Reset();

Console.ReadKey();

Console.Write(" ");

//In queue received new data. Processing repeat
clock.Start();

ActiveTasksNumber = 0;
MaxActiveTasksNumber = 0;
Data = 50;
}

Data--;
return Data;
}
}
}
}

我的猜测是,当队列为空时,除了一个等待新数据的任务外,所有任务都已完成。当数据到达队列时,重复计算。

但是如果你看结果,每次同时运行的任务数都会增加。

为什么会这样?

测试结果

enter image description here

最佳答案

你的做法是错误的。

首先,你的队列在哪里?
对于要在并发环境中排队的任何作业,请使用 ConcurrentQueue。

并发队列,就是这样使用的,不需要随时加锁。

// To create your Queue
ConcurrentQueue<string> queue = new ConcurrentQueue<string>();

// To add objects to your Queue
queue.Enqueue("foo");

// To deque items from your Queue
String bar;
queue.TryDequeue(out bar);

// To loop a process until your Queue is empty
while(!queue.IsEmpty)
{
String bar;
queue.TryDequeue(out bar);
}

接下来是如何递增和递减计数器,有一种更好的线程安全方法。同样,数据不需要锁定。

// Change your data type from int to long
static long ActiveTasksNumber = 0;
static long MaxActiveTasksNumber = 0;

// To increment the values in a Thread safe fashion:
Interlocked.Increment(ref ActiveTasksNumber);

// To decrement:
Interlocked.Decrement(ref MaxActiveTasksNumber);

实现我向您展示的内容,它应该会让您的问题消失

编辑:命名空间

using System.Collections.Concurrent;
using System.Threading;

关于c# - 如何使用任务工厂处理队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20429791/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com