- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 Parallel.ForEach
处理一堆项目。问题是,我想根据打开的工作人员(插槽)的数量来优先考虑哪些项目可以工作。例如。如果我同时处理 8 个任务并且在任务 1-4 之间打开了一个槽,我想将简单的工作分配给这些槽。插槽的下半部分将得到艰苦的工作。这样,我就不会将所有 8 个插槽都占用来进行艰苦/长时间运行的工作,简单/快速的项目将首先运行。我按如下方式实现了这一点:
代码
const int workers = 8;
List<Thing> thingsToDo = ...; //Get the things that need to be done.
Thing[] currentlyWorkingThings = new Thing[workers]; //One slot for each worker.
void Run() {
Parallel.ForEach(PrioritizeThings(thingsToDo), o => {
int index = 0;
//"PrioritizeTasks" added this thing to the list of currentlyWorkingThings.
//Find my position in this list.
lock (currentlyWorkingThings)
index = currentlyWorkingThings.IndexOf(o);
//Do work on this thing...
//Then remove it from the list of currently working things, thereby
// opening a new slot when this worker returns/finishes.
lock (currentlyWorkingThings)
currentlyWorkingThings[index] = null;
});
}
IEnumerable<Thing> PrioritizeThings(List<Thing> thingsToDo) {
int slots = workers;
int halfSlots = (int)Math.Ceiling(slots / 2f);
//Sort thingsToDo by their difficulty, easiest first.
//Loop until we've worked every Thing.
while (thingsToDo.Count > 0) {
int slotToFill = ...; //Find the first open slot.
Thing nextThing = null;
lock (currentlyWorkingThings) {
//If the slot is in the "top half", get the next easy thing - otherwise
// get the next hard thing.
if (slotToFill < halfSlots)
nextThing = thingsToDo.First();
else
nextThing = thingsToDo.Last();
//Add the nextThing to the list of currentlyWorkingThings and remove it from
// the list of thingsToDo.
currentlyWorkingThings[slotToFill] = nextThing;
thingsToDo.Remove(nextThing);
}
//Return the nextThing to work.
yield return nextThing;
}
}
问题
所以我在这里看到的问题是 Parallel
正在请求下一个要处理的事情 PrioritizeThings
在一个插槽打开之前(在一个现有的东西被打开之前)完全的)。我假设 Parallel
是展望 future 并让事情提前做好准备。我希望它不要这样做,并且只在完全完成后才填充一个 worker /插槽。我想到的解决此问题的唯一方法是在 PrioritizeThings
中添加一个 sleep /等待循环,直到它看到一个合法的开放插槽,它才会返回工作。但我不喜欢那样,我希望有某种方法可以让 Parallel
在开始工作之前等待更长时间。有什么建议吗?
最佳答案
有一种内置(有点)的方法可以准确支持您所描述的情况。
当您创建 ForEach
时,您需要传入 ParallelOptions
与非标准 TaskScheduler
.困难的部分是创建一个 TaskScheduler
来为您执行该优先级系统,幸运的是,Microsoft 发布了一组示例,其中包含一个名为“ParallelExtensionsExtras”的调度程序及其调度程序 QueuedTaskScheduler
。
private static void Main(string[] args)
{
int totalMaxConcurrancy = Environment.ProcessorCount;
int highPriorityMaxConcurrancy = totalMaxConcurrancy / 2;
if (highPriorityMaxConcurrancy == 0)
highPriorityMaxConcurrancy = 1;
QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default, totalMaxConcurrancy);
var highPriortiyScheduler = qts.ActivateNewQueue(0);
var lowPriorityScheduler = qts.ActivateNewQueue(1);
BlockingCollection<Foo> highPriorityWork = new BlockingCollection<Foo>();
BlockingCollection<Foo> lowPriorityWork = new BlockingCollection<Foo>();
List<Task> processors = new List<Task>(2);
processors.Add(Task.Factory.StartNew(() =>
{
Parallel.ForEach(highPriorityWork.GetConsumingPartitioner(), //.GetConsumingPartitioner() is also from ParallelExtensionExtras, it gives better performance than .GetConsumingEnumerable() with Parallel.ForEeach(
new ParallelOptions() { TaskScheduler = highPriortiyScheduler, MaxDegreeOfParallelism = highPriorityMaxConcurrancy },
ProcessWork);
}, TaskCreationOptions.LongRunning));
processors.Add(Task.Factory.StartNew(() =>
{
Parallel.ForEach(lowPriorityWork.GetConsumingPartitioner(),
new ParallelOptions() { TaskScheduler = lowPriorityScheduler},
ProcessWork);
}, TaskCreationOptions.LongRunning));
//Add some work to do here to the highPriorityWork or lowPriorityWork collections
//Lets the blocking collections know we are no-longer going to be adding new items so it will break out of the `ForEach` once it has finished the pending work.
highPriorityWork.CompleteAdding();
lowPriorityWork.CompleteAdding();
//Waits for the two collections to compleatly empty before continueing
Task.WaitAll(processors.ToArray());
}
private static void ProcessWork(Foo work)
{
//...
}
即使您有两个 Parallel.ForEach
实例运行它们的总和,也不会使用超过您为 MaxConcurrency
传入的值QueuedTaskScheduler
构造函数,如果两者都有工作要做,它将优先清空 highPriorityWork
集合(最多为所有可用插槽的 1/2这样您就不会阻塞低优先级队列,您可以根据您的性能需求轻松地将其调整为更高或更低的比率。
如果您不希望高优先级总是获胜,而是希望有一个在两个列表之间交替的“循环”式调度程序(因此您不希望快速项目总是获胜,但只需要它们与慢项目一起洗牌)你可以为两个或更多队列设置相同的优先级(或者只使用 RoundRobinTaskSchedulerQueue
做同样的事情)
关于c# - 让 Parallel.ForEach 等待工作直到插槽打开,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21195406/
在 Oracle 中,PARALLEL 被广泛使用。提示 PARALLEL、PARALLEL(8) 和 PARALLEL(a,8) 有什么区别。如何选择最佳的查询提示? SELECT /*+ PARA
好的,我希望以前没有问过这个问题,因为在搜索中很难找到。 我查看了 F95 手册,但仍然觉得这很模糊: For the simple case of: DO i=0,99 END DO 我正
我有一个 C-shell 脚本,其中有一个名为 $hosts_string 的变量,格式为: host1,host2,...,hostN 我还有一个名为 $chrs_string 的变量,其形式为:
是否可以从由gnu parallel产生的脚本的多次运行中调用gnu parallel? 我有一个python脚本,可以运行100个顺序顺序迭代,并且在每次迭代中的某处,并行计算4个值(使用gnu p
我想在几个输入上运行几个长时间运行的进程。例如。: solver_a problem_1 solver_b problem_1 ... solver_b problem_18 solver_c pro
TParallel.&For 和 TParallel.For 之间有区别吗? 两者都可以在 Delphi 10 Seattle 中编译。那么我应该坚持哪一个呢? 最佳答案 TParallel.&For
我第一次使用 julia 进行并行计算.我有点头疼。所以假设我开始 julia如下:julia -p 4 .然后我为所有处理器声明 a 函数,然后将它与 pmap 一起使用还有@parallel fo
关闭。这个问题是off-topic .它目前不接受答案。 想改善这个问题吗? Update the question所以它是 on-topic对于堆栈溢出。 10年前关闭。 Improve this
我有一堆相互排斥的方法,因此可以并行运行。有这样做的好方法吗?到目前为止,我有以下两种实现方式,但我不确定是否应该选择其中一种。 使用 Parallel.For : Parallel.For(0, 2
我对并行运行脚本很感兴趣,并且我已经开始查看 GNU 并行工具,但是我遇到了一些麻烦。我的脚本 doSomething 有 3 个参数,我想在参数的不同值上并行运行脚本。我该怎么做? 我试过:para
我需要在多核(和多线程)机器上运行多个作业。我正在使用 GNU Parallel utility跨核心分配作业以加速任务。要执行的命令在名为“命令”的文件中可用。我使用以下命令运行 GNU Paral
我正在尝试使用如下两个输入运行 Python 脚本。我得到了大约 300 个这两个输入,所以我想知道是否有人可以建议如何并行运行它们。 单次运行看起来像: python stable.py KOG_1
每天我都必须更新一堆存储库,并在其中一些中执行另一个命令(来自 CARTON,Perl 模块依赖管理器)。我总是使用循环来执行此操作,但我想与 并行执行GNU 并行 如果可能,但我不太了解它的tuto
正如标题所说:@parallel 之间究竟有什么区别?和 pmap ?我的意思不是明显的一个是循环的宏,另一个适用于函数,我的意思是它们的实现究竟有什么不同,我应该如何使用这些知识在它们之间进行选择?
我有一些矩阵乘法运算。我想通过多个处理器并行执行这些操作。这可以使用 MPI(消息传递接口(interface))在高性能计算集群上完成。 同样,我可以使用多个辅助角色在云中进行一些并行化吗?有什么办
joblib模块提供了一个简单的帮助程序类,以使用多处理并行编写循环的循环。 这段代码使用列表推导来完成这项工作: import time from math import sqrt from job
我的问题是这样的one .但我想做一些不同的事情... 例如,在我的并行区域内,我想在 4 个线程上运行我的代码。当每个线程进入 for 循环时,我想在 8 个线程上运行我的代码。像 #pramga
我正在尝试使用 ipython 并行库中的并行计算。但是我对此知之甚少,而且我发现很难从对并行计算一无所知的人那里阅读该文档。 有趣的是,我发现的所有教程都只是重复使用文档中的示例,并使用相同的解释,
我的项目结构看起来像 Root + subproj1 + subproj2 在每个子项目中定义了自己的任务 run(){}。 我想要做的是从 Root 项目的运行任务并行运行 :subpro
我有一个 Foo ID 的列表。我需要为每个 ID 调用一个存储过程。 例如 Guid[] siteIds = ...; // typically contains 100 to 300 elemen
我是一名优秀的程序员,十分优秀!