gpt4 book ai didi

c# - 让 Parallel.ForEach 等待工作直到插槽打开

转载 作者:行者123 更新时间:2023-11-30 22:09:56 25 4
gpt4 key购买 nike

我正在使用 Parallel.ForEach 处理一堆项目。问题是,我想根据打开的工作人员(插槽)的数量来优先考虑哪些项目可以工作。例如。如果我同时处理 8 个任务并且在任务 1-4 之间打开了一个槽,我想将简单的工作分配给这些槽。插槽的下半部分将得到艰苦的工作。这样,我就不会将所有 8 个插槽都占用来进行艰苦/长时间运行的工作,简单/快速的项目将首先运行。我按如下方式实现了这一点:

代码

const int workers = 8;
List<Thing> thingsToDo = ...; //Get the things that need to be done.
Thing[] currentlyWorkingThings = new Thing[workers]; //One slot for each worker.

void Run() {
Parallel.ForEach(PrioritizeThings(thingsToDo), o => {
int index = 0;

//"PrioritizeTasks" added this thing to the list of currentlyWorkingThings.
//Find my position in this list.
lock (currentlyWorkingThings)
index = currentlyWorkingThings.IndexOf(o);

//Do work on this thing...

//Then remove it from the list of currently working things, thereby
// opening a new slot when this worker returns/finishes.
lock (currentlyWorkingThings)
currentlyWorkingThings[index] = null;
});
}

IEnumerable<Thing> PrioritizeThings(List<Thing> thingsToDo) {
int slots = workers;
int halfSlots = (int)Math.Ceiling(slots / 2f);

//Sort thingsToDo by their difficulty, easiest first.

//Loop until we've worked every Thing.
while (thingsToDo.Count > 0) {
int slotToFill = ...; //Find the first open slot.
Thing nextThing = null;

lock (currentlyWorkingThings) {
//If the slot is in the "top half", get the next easy thing - otherwise
// get the next hard thing.
if (slotToFill < halfSlots)
nextThing = thingsToDo.First();
else
nextThing = thingsToDo.Last();

//Add the nextThing to the list of currentlyWorkingThings and remove it from
// the list of thingsToDo.
currentlyWorkingThings[slotToFill] = nextThing;
thingsToDo.Remove(nextThing);
}

//Return the nextThing to work.
yield return nextThing;
}
}

问题

所以我在这里看到的问题是 Parallel 正在请求下一个要处理的事情 PrioritizeThings 在一个插槽打开之前(在一个现有的东西被打开之前)完全的)。我假设 Parallel 是展望 future 并让事情提前做好准备。我希望它不要这样做,并且只在完全完成后才填充一个 worker /插槽。我想到的解决此问题的唯一方法是在 PrioritizeThings 中添加一个 sleep /等待循环,直到它看到一个合法的开放插槽,它才会返回工作。但我不喜欢那样,我希望有某种方法可以让 Parallel 在开始工作之前等待更长时间。有什么建议吗?

最佳答案

有一种内置(有点)的方法可以准确支持您所描述的情况。

当您创建 ForEach 时,您需要传入 ParallelOptions与非标准 TaskScheduler .困难的部分是创建一个 TaskScheduler 来为您执行该优先级系统,幸运的是,Microsoft 发布了一组示例,其中包含一个名为“ParallelExtensionsExtras”的调度程序及其调度程序 QueuedTaskScheduler

private static void Main(string[] args)
{
int totalMaxConcurrancy = Environment.ProcessorCount;
int highPriorityMaxConcurrancy = totalMaxConcurrancy / 2;

if (highPriorityMaxConcurrancy == 0)
highPriorityMaxConcurrancy = 1;

QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default, totalMaxConcurrancy);
var highPriortiyScheduler = qts.ActivateNewQueue(0);
var lowPriorityScheduler = qts.ActivateNewQueue(1);

BlockingCollection<Foo> highPriorityWork = new BlockingCollection<Foo>();
BlockingCollection<Foo> lowPriorityWork = new BlockingCollection<Foo>();

List<Task> processors = new List<Task>(2);

processors.Add(Task.Factory.StartNew(() =>
{
Parallel.ForEach(highPriorityWork.GetConsumingPartitioner(), //.GetConsumingPartitioner() is also from ParallelExtensionExtras, it gives better performance than .GetConsumingEnumerable() with Parallel.ForEeach(
new ParallelOptions() { TaskScheduler = highPriortiyScheduler, MaxDegreeOfParallelism = highPriorityMaxConcurrancy },
ProcessWork);
}, TaskCreationOptions.LongRunning));

processors.Add(Task.Factory.StartNew(() =>
{
Parallel.ForEach(lowPriorityWork.GetConsumingPartitioner(),
new ParallelOptions() { TaskScheduler = lowPriorityScheduler},
ProcessWork);
}, TaskCreationOptions.LongRunning));


//Add some work to do here to the highPriorityWork or lowPriorityWork collections


//Lets the blocking collections know we are no-longer going to be adding new items so it will break out of the `ForEach` once it has finished the pending work.
highPriorityWork.CompleteAdding();
lowPriorityWork.CompleteAdding();

//Waits for the two collections to compleatly empty before continueing
Task.WaitAll(processors.ToArray());
}

private static void ProcessWork(Foo work)
{
//...
}

即使您有两个 Parallel.ForEach 实例运行它们的总和,也不会使用超过您为 MaxConcurrency 传入的值QueuedTaskScheduler 构造函数,如果两者都有工作要做,它将优先清空 highPriorityWork 集合(最多为所有可用插槽的 1/2这样您就不会阻塞低优先级队列,您可以根据您的性能需求轻松地将其调整为更高或更低的比率。

如果您不希望高优先级总是获胜,而是希望有一个在两个列表之间交替的“循环”式调度程序(因此您不希望快速项目总是获胜,但只需要它们与慢项目一起洗牌)你可以为两个或更多队列设置相同的优先级(或者只使用 RoundRobinTaskSchedulerQueue 做同样的事情)

关于c# - 让 Parallel.ForEach 等待工作直到插槽打开,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21195406/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com