I'm using the Parallel.ForEach
loop to do some work and I initialize it with the localInit
like this:
我正在使用Parall.ForEach循环执行一些工作,并使用本地Init对其进行初始化,如下所示:
localInit: () => new
{
foo = new Foo(),
bars = CreateBars(),
}
According to the documentation:
根据文件:
localInit
, or the function that initializes the thread-local variable.
This function is called once for each partition in which the
Parallel.ForEach<TSource>
operation executes. Our example initializes
the thread-local variable to zero.
So I tried to use it like that but I observed that the loop is constantly killing and creating new tasks which results in frequent calls to localInit
. This I my option is counterproductive and doesn't work as desired.
所以我试着这样使用它,但我观察到循环不断地终止并创建新的任务,这导致频繁地调用localInit。我的选择适得其反,而且没有达到预期的效果。
I thought when the Parallel.ForEach
would create for example four partitions it would keep them alive until it itereated over all items but it doesn't. It's calling localFinally
and localInit
several hundered times for a collection with a few thousend items. How so?
我以为当Parallel.ForEach创建四个分区时,它会让它们保持活动状态,直到它迭代所有项,但它没有。它调用了localFinally和localInit数百次,以获得一个包含几千个项目的集合。怎么说呢?
Can this behavior somehow be prevented? I was really hoping to save some resources but it doesn't really let me.
这种行为能以某种方式被阻止吗?我真的希望节省一些资源,但它真的不能让我这样做。
Here's how the loop looks like:
下面是该循环的外观:
var parallelLoopResult = Parallel.ForEach
(
source: items,
parallelOptions: parallelOptions,
localInit: () => new
{
foo = new Foo(),
bars = CreateBars(),
},
body: (item, loopState, i, local) =>
{
parallelOptions.CancellationToken.ThrowIfCancellationRequested();
var results = local.bars.Select(x => ...).ToList().
....
return local;
},
localFinally: local =>
{
local.foo.Dispose();
lock (aggregateLock)
{
... process transformed bars
}
}
);
ParallelOptions:
并行选项:
var parallelOptions = new ParallelOptions
{
CancellationToken = cancellationTokenSource.Token,
#if DEBUG
MaxDegreeOfParallelism = 1
//MaxDegreeOfParallelism = Environment.ProcessorCount
#else
MaxDegreeOfParallelism = Environment.ProcessorCount
#endif
};
更多回答
What is the source
(IEnumerable<T>
or Partitioner<T>
)? What are your ParallelOptions
?
来源是什么(IEnumerable或分区)?您的并行选项是什么?
@svick items are just strings (like keys in a database). ParallelOptions just specify the MaxDegreeOfParallelism (Environment.ProcessorCount) and a CancellationToken.
@svick项只是字符串(就像数据库中的键)。ParallOptions只指定MaxDegreeOfParallism(Environmental ment.ProcessorCount)和一个CancerancationToken。
If I understand the code correctly, Parallel.ForEach()
restarts each Task
every few hundred milliseconds. This means that if each iteration is substantial (as it generally should be), you will get lots of Task
s and thus lots of calls to localInit
and localFinally
. The reason for this is fairness with regards to other code in the same process that also uses the same ThreadPool
.
如果我正确理解了代码,Parall.ForEach()每几百毫秒重新启动一次每个任务。这意味着,如果每次迭代都是实质性的(通常应该如此),您将获得大量任务,从而获得大量对localInit和LocalFinally的调用。这样做的原因是对于同一进程中也使用相同线程池的其他代码是公平的。
I don't think there is a way to change this behavior of Parallel.ForEach()
. I think a good way to solve this is to write your own simple version of Parallel.ForEach()
. Considering that you can take advantage of Partitioner<T>
and depending on what features of Parallel.ForEach()
you need, it could be relatively simple. For example, something like:
我认为没有一种方法可以改变Paralle.ForEach()的这种行为。我认为解决这个问题的一个好方法是编写您自己的简单版本的Paralle.ForEach()。考虑到您可以利用Partiator
,并根据您需要的Parall.ForEach()的特性,它可能会相对简单。例如,类似于:
public static void MyParallelForEach<TSource, TLocal>(
IEnumerable<TSource> source, int degreeOfParallelism,
Func<TLocal> localInit, Func<TSource, TLocal, TLocal> body, Action<TLocal> localFinally)
{
var partitionerSource = Partitioner.Create(source).GetDynamicPartitions();
Action taskAction = () =>
{
var localState = localInit();
foreach (var item in partitionerSource)
{
localState = body(item, localState);
}
localFinally(localState);
};
var tasks = new Task[degreeOfParallelism - 1];
for (int i = 0; i < degreeOfParallelism - 1; i++)
{
tasks[i] = Task.Run(taskAction);
}
taskAction();
Task.WaitAll(tasks);
}
This overload isn't the only one, so you can try this:
这种过载并不是唯一的,所以你可以尝试一下:
var bars = CreateBars();
Parallel.Foreach(bars, b => { /* your action here */};
But if you really want to create a copy of bars
for each thread, you can use some copy method from LINQ (assuming your bars is a IEnumerable<T>
variable):
但是,如果您确实希望为每个线程创建bar的副本,则可以使用LINQ中的一些复制方法(假设bar是IEnumerable
变量):
var bars = CreateBars();
localInit: () => new
{
foo = new Foo(),
bars = new List<IBar>(bars),
}
There is no Parallel.ForEach
overload that offers this functionality. If you want to set an upper limit to the invocations of the localInit
delegate, you have to code it manually. Svick's MyParallelForEach
implementation is simple and gets the job done, but it doesn't have all the features of the native Parallel.ForEach
. This answer is an attempt for a fully featured implementation.
不存在Parall.ForEach重载来提供此功能。如果要为本地Init委托的调用设置上限,则必须手动编写代码。Svick的MyParallForEach实现很简单,可以完成工作,但它不具备原生Parall.ForEach的所有功能。这个答案是对功能齐全的实现的一种尝试。
The Parallel_ForEach
shown below has identical signature with the native Parallel.ForEach
, and almost identical features and behavior. The main difference is the limit on how many TLocal
instances may be created. The localInit
delegate is invoked at most MaxDegreeOfParallelism
times, guaranteed.
下面显示的PARALLEL_ForEACH具有与原生Parall.ForEach相同的签名,并且功能和行为几乎相同。主要区别在于可以创建的TLocal实例的数量限制。可以保证,最多在MaxDegreeOfParallism次数调用LocalInit委托。
public static ParallelLoopResult Parallel_ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
ParallelOptions parallelOptions,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> body,
Action<TLocal> localFinally)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(parallelOptions);
ArgumentNullException.ThrowIfNull(localInit);
ArgumentNullException.ThrowIfNull(body);
ArgumentNullException.ThrowIfNull(localFinally);
// Create a copy of the parallelOptions
parallelOptions = new()
{
MaxDegreeOfParallelism = parallelOptions.MaxDegreeOfParallelism,
CancellationToken = parallelOptions.CancellationToken,
TaskScheduler = parallelOptions.TaskScheduler,
};
if (parallelOptions.MaxDegreeOfParallelism == -1)
parallelOptions.MaxDegreeOfParallelism = Environment.ProcessorCount;
OrderablePartitioner<TSource> partitioner = Partitioner.Create(source);
IList<IEnumerator<TSource>> enumerators = partitioner.GetPartitions(
parallelOptions.MaxDegreeOfParallelism);
return Parallel.ForEach(enumerators, parallelOptions, (e, state) =>
{
(TLocal Value, bool HasValue) localData = default;
try
{
using (e)
{
while (!state.ShouldExitCurrentIteration && e.MoveNext())
{
TSource item = e.Current;
if (!localData.HasValue) localData = (localInit(), true);
localData.Value = body(item, state, localData.Value);
}
}
}
finally
{
if (localData.HasValue) localFinally(localData.Value);
}
});
}
Two features are missing:
缺少两个功能:
- The
MaxDegreeOfParallelism == -1
configuration (unlimited parallelism) is not supported. If you don't specify the MaxDegreeOfParallelism
, or if you give it the value -1
, the maximum degree of parallelism will be equal to the number of CPU cores (Environment.ProcessorCount
).
- The
ParallelLoopState.Break
functionality is broken. You may use the ParallelLoopState.Stop
to stop the parallel loop, but if you use the Break
it won't work correctly, and eventually the ParallelLoopResult.LowestBreakIteration
will be wrong too.
Only once per
thread
execution bars are created. But do you know how many parallel executions are done? It is at the discretion of Parallel Execution Engine to start as many parallel executions as it likes.
每个线程只创建一次执行栏。但你知道有多少次并行执行吗?并行执行引擎可以随意启动任意数量的并行执行。
If you want to limit parallel execution, use MaxDegreeOfParallelism property. This will put an upper limit on how many bars will be created at one time. It will still not control total bars created and also total bars may be less than what you'd expect now.
如果要限制并行执行,请使用MaxDegreeOfParallism属性。这将对一次创建的条数设置上限。它仍然不会控制创建的总条数,而且总条数可能比你现在预期的要少。
If you want to have explicit control, create tasks manually.
如果您希望进行显式控制,请手动创建任务。
更多回答
Very nice. Thank you. I'll try to implement it in my project. It looks promising. I'll need a try/catch here and there but I think I get the idea. Give me some time before I hit accept ;-)
非常好。谢谢。我会尝试在我的项目中实现它。它看起来很有希望。我需要在各处试一试,但我想我明白了。在我点击接受之前给我一些时间;-)
Okay it wasn't that hard after all ;-) The performance of the loop doubled and nearly even trippled with this solution :-o amazing. It's a pitty that the standard ForEach has such a draw back.
好的,这毕竟不是那么难;)循环的性能翻了一番,甚至几乎是这个解决方案的三倍:,令人惊叹。令人遗憾的是,For Each的标准却有如此大的差距。
I've just added a try/finally around the body and localFinally so it doesn't get lost when something bad happens. I understand the fairness but there should definitely be an option like max-performance no matter what so that we can rely on the localInit be called only once per partytion.
我刚刚在身体周围和局部添加了一次尝试/最终,这样当发生不好的事情时,它就不会丢失。我明白这是公平的,但无论如何都应该有一个像max-Performance这样的选项,这样我们就可以依赖于每个派对只调用一次本地初始化。
Unfortunately this won't work. Bars are a kind transformations that I need to apply to each item therefore I tried to create those Bars for each thread so that I don't have to use locks. Each thread should have its own instances but somehow I see in my logs and also when I debug the code that it creates it over and over agian and the thead ids are also always the same. I've added the entire loop to the question.
不幸的是,这是行不通的。条是一种我需要应用到每个项目的转换,因此我尝试为每个线程创建那些条,这样我就不必使用锁。每个线程都应该有自己的实例,但不知何故,我在我的日志中看到,当我调试代码时,它一遍又一遍地创建它,而且Head ID也总是相同的。我已经在问题中加入了整个循环。
Oh, I like this bars = new List<IBar>(bars),
I'll try it... why didn't I come to this idea it seem to be so obvious ;-)
哦,我喜欢这个酒吧=新列表(酒吧),我会尝试.为什么我没有想到这个想法,它似乎是如此明显;-)
I think I know why this happens. The loop is constantly killing and creating new task/threads because the localFinally is also being called very often without the loop being completely iterated. I probably have to investigate the TaskScheduler, perhaps this can prevent it from happening.
我想我知道为什么会这样了。循环不断地终止和创建新的任务/线程,因为在没有完全迭代循环的情况下,也经常调用本地最终。我可能不得不调查TaskScheduler,也许这可以防止它发生。
I've tested it with 1, 2, 4 and 8 MaxDegreeOfParallelism. The more I use the more often it recreates the tasks. I've edited my question, maybe it's more clear now.
我已经用1、2、4和8MaxDegreeOfParallism进行了测试。我使用的越多,它重新创建任务的频率就越高。我已经编辑了我的问题,也许现在更清楚了。
@t3chb0t: Yes... you see with your code the number of parallel executions that will be started is at the discretion of Parallel Execution Engine. You are wrongly expecting that you can know beforehand how many bars will be created. The number of bars will now depend on what the load balancer feels. If it feels that it can have more parallel executions, you will have more bars. If it feels that load is too much, you will see fewer bars created.
@t3chb0t:是的……您可以看到,在您的代码中,将启动的并行执行的数量由并行执行引擎决定。您错误地期望您可以预先知道将创建多少条条。条数现在将取决于负载均衡器的感觉。如果它觉得它可以有更多的并行执行,你就会有更多的栏。如果它觉得负载太大,您将看到创建的条数较少。
Oh, does that mean that the number of tasks can fluctuate during runtime and this could be the cause for the frequent inits? I thought it would keep them alive. Mhmm... is there any way to prevent this behavior? It's quite disturbing I have to admit.
哦,这是不是意味着任务的数量在运行时可能会波动,这可能是频繁初始化的原因?我以为这能让他们活下来。嗯..。有什么方法可以防止这种行为吗?我不得不承认,这相当令人不安。
@t3chb0t: You can only put an upper limit on the number of simultaneous parallel execution by using the MaxDegreeOfParallelism
property. AFAIK, this is the max control you can have.
@t3chb0t:只能通过使用MaxDegreeOfParallism属性对同时并行执行的数量设置上限。AFAIK,这是你能有的最大控制。
@displayName "Only once per thread bars are created." This is not true, as the question clearly demonstrates. Even with MaxDegreeOfParallelism
set to ProcessorCount
, hundreds of calls to localInit
are done, while using at most ProcessorCount
threads.
@displayName“每个线程只创建一次栏。”正如这个问题清楚地表明的那样,这不是真的。即使将MaxDegreeOfParallism设置为ProcessorCount,也会在使用最多ProcessorCount线程的情况下完成数百次对LocalInit的调用。
我是一名优秀的程序员,十分优秀!