gpt4 book ai didi

c# - 如何使用 C# 4 中的 TPL 创建常量 Processing "Flow"

转载 作者:太空狗 更新时间:2023-10-29 22:53:44 25 4
gpt4 key购买 nike

我不确定以下是否可行,但我想以节流的方式在并行中调用一些操作,但要保持处理流程连续,而不要恢复使用计时器或循环/ sleep 周期。

到目前为止,我已经让它工作了,它从某个来源加载了大量输入……然后以受控方式并行处理它们并像下面这样循环。

static void Main(string[] args)
{
while(true) //Simulate a Timer Elapsing...
{
IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
//Simulate querying database queue tables for next batch of entries

RunAllActions(inputs, 3); //Max 3 at a time.
}
}

static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};

Parallel.ForEach<int>(inputs, options, DoWork);
//Blocks here until all inputs are processed.
Console.WriteLine("Batch of Work Done!!!");
}

static void DoWork(int input)
{
Console.WriteLine("Starting Task {0}", input);
System.Threading.Thread.Sleep(3000);
Console.WriteLine("Finishing Task {0}", input);
}

我想知道的是,TPL 中是否有一个结构,我可以使用它来保持它始终运行...以便我可以用收到的 MessageQueue 替换“Timer Elapsing”和“Database Polling”事件。

以下是我想要实现的目标的粗略版本...我可以通过其他方式实现它,但我想知道 TPL 是否内置了这种模式。

internal class Engine
{
private MessageQueue mq;
private Queue<int> myInternalApplicationQueue;

public Engine()
{
//Message Queue to get new task inputs from
mq = new MessageQueue();
mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);

// internal Queue to put them in.
myInternalApplicationQueue = new Queue<int>();
}

void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
//On MQ Receive, pop the input in a queue in my app
int input = (int) e.Message.Body;

myInternalApplicationQueue.Enqueue(input);
}

public void StartWorking()
{
//Once this gets called, it doesn't stop... it just keeps processing/watching that queue
//processing the tasks as fast as it's allowed while the app is running.
var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
// ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
}

}

最佳答案

您可以使用 BlockingCollection<T> 处理这种类型的操作,这实际上是一种生产者/消费者场景。

基本上,您会设置一个 BlockingCollection<T>并将其用作您的“制作人”。然后,您将拥有三个(或任意数量)消费者 任务(通常设置为长时间运行的任务)来处理元素(通过在标准 foreach 循环中调用 blockingCollection.GetConsumingEnumerable())。

然后您可以根据需要将项目添加到集合中,它们将不断得到处理。当你完全完成后,你会调用 BlockingCollection<T>.CompleteAdding ,这将导致 foreach 循环完成,整个过程停止。

作为旁注 - 您通常不想使用 Parallel.ForEachGetConsumingEnumerable() 上来自BlockingCollection<T> - 至少不会,除非你自己处理分区。通常最好使用多个任务并让每个任务按顺序迭代。原因是 Parallel.ForEach 中的默认分区方案会导致问题(它会等到数据“ block ”可用,而不是立即处理项目,并且“ block ”会随着时间的推移变得越来越大)。

关于c# - 如何使用 C# 4 中的 TPL 创建常量 Processing "Flow",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9232778/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com