gpt4 book ai didi

c# - 使用 Parallel ForEach 进行本地初始化如何工作?

转载 作者:可可西里 更新时间:2023-11-01 08:31:47 24 4
gpt4 key购买 nike

我不确定 Parallel.ForEach 中本地 init 函数的使用,如 msdn 文章中所述:http://msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach<int, long>(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
{
subtotal += nums[j]; //modify local variable
return subtotal; // value to be passed to next iteration
},...

() => 0 如何初始化任何东西?变量的名称是什么,我如何在循环逻辑中使用它?

最佳答案

引用following overloadParallel.ForEach静态扩展方法:

public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
Action<TLocal> localFinally
)

在您的具体示例中

线路:
() => 0, // method to initialize the local variable

只是一个 lambda(匿名函数),它将返回常量整数零。这个 lambda 作为 localInit 传递 Parallel.ForEach 的参数- 由于 lambda 返回一个整数,它的类型为 Func<int>并输入 TLocal可以推断为 int由编译器(类似地, TSource 可以从作为参数传递的集合类型推断 source )

然后将返回值 (0) 作为第三个参数(名为 subtotal )传递给 taskBody Func .此 (0) 用作主体循环的初始种子:
(j, loop, subtotal) =>
{
subtotal += nums[j]; //modify local variable (Bad idea, see comment)
return subtotal; // value to be passed to next iteration
}

第二个 lambda(传递给 taskBody)被调用 N 次,其中 N 是 TPL 分区器分配给此任务的项目数。

随后每次调用第二个 taskBody lambda 将传递新值 subTotal ,有效地计算运行 部分 总计,对于此任务。添加分配给此任务的所有项目后,第三个也是最后一个, localFinally将再次调用函数参数,传递 subtotal 的最终值从 taskBody 返回.由于多个此类任务将并行运行,因此还需要最后一步,将所有部分总计添加到最终的“总”总计中。但是,因为多个并发任务(在不同线程上)可能会争夺 grandTotal变量,重要的是要以线程安全的方式对其进行更改。

(我更改了 MSDN 变量的名称以使其更清楚)
long grandTotal = 0;
Parallel.ForEach(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
subtotal + nums[j], // value to be passed to next iteration subtotal
// The final value of subtotal is passed to the localFinally function parameter
(subtotal) => Interlocked.Add(ref grandTotal, subtotal)

在 MS 示例中,修改任务主体内的参数小计是一种糟糕的做法,也是不必要的。即代码 subtotal += nums[j]; return subtotal;最好只是 return subtotal + nums[j];可以缩写为 lambda 速记投影 (j, loop, subtotal) => subtotal + nums[j]
一般情况
localInit / body / localFinally Parallel.For / Parallel.ForEach 的重载允许在 taskBody 之前和之后(分别)运行一次每个任务的初始化和清理代码迭代由任务执行。

(注意传递给并行的 For range/Enumerable For/ Foreach 将被划分为 IEnumerable<> 的批次,每个批次将分配一个任务)

每个任务 , localInit将被调用一次, body代码将被重复调用,每个项目一次( 0..N 次),和 localFinally完成后将被调用一次。

此外,您可以通过通用 taskBody 传递任务期间所需的任何状态(即传递给 localFinallyTLocal 委托(delegate))。来自 localInit Func 的返回值- 我把这个变量称为 taskLocals以下。

“localInit”的常见用法:
  • 创建和初始化循环体所需的昂贵资源,例如数据库连接或 Web 服务连接。
  • 保持任务局部变量以保存(无竞争)运行总计或集合
  • 如果您需要从 localInit 返回多个对象到 taskBodylocalFinally ,你可以使用一个强类型类,一个 Tuple<,,>或者,如果您仅对 localInit / taskBody / localFinally 使用 lambdas ,您还可以通过匿名类传递数据。请注意,如果您使用来自 localInit 的返回值要在多个任务之间共享引用类型,您需要考虑此对象的线程安全性 - 最好是不变性。

  • “localFinally”操作的常见用法:
  • 释放资源,如IDisposables用于 taskLocals (例如数据库连接、文件句柄、Web 服务客户端等)
  • 将每个任务完成的工作聚合/组合/减少回共享变量。这些共享变量将被竞争,因此线程安全是一个问题:
  • 例如Interlocked.Increment在像整数这样的原始类型上
  • lock写操作需要或类似的东西
  • 利用 concurrent collections以节省时间和精力。
  • taskBodytight循环操作的一部分 - 您需要优化它的性能。

    这一切最好用一个注释示例进行总结:
    public void MyParallelizedMethod()
    {
    // Shared variable. Not thread safe
    var itemCount = 0;

    Parallel.For(myEnumerable,
    // localInit - called once per Task.
    () =>
    {
    // Local `task` variables have no contention
    // since each Task can never run by multiple threads concurrently
    var sqlConnection = new SqlConnection("connstring...");
    sqlConnection.Open();

    // This is the `task local` state we wish to carry for the duration of the task
    return new
    {
    Conn = sqlConnection,
    RunningTotal = 0
    }
    },
    // Task Body. Invoked once per item in the batch assigned to this task
    (item, loopState, taskLocals) =>
    {
    // ... Do some fancy Sql work here on our task's independent connection
    using(var command = taskLocals.Conn.CreateCommand())
    using(var reader = command.ExecuteReader(...))
    {
    if (reader.Read())
    {
    // No contention for `taskLocal`
    taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
    }
    }
    // The same type of our `taskLocal` param must be returned from the body
    return taskLocals;
    },
    // LocalFinally called once per Task after body completes
    // Also takes the taskLocal
    (taskLocals) =>
    {
    // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
    if (taskLocals.Conn != null)
    taskLocals.Conn.Dispose();

    // Do any reduce / aggregate / synchronisation work.
    // NB : There is contention here!
    Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
    }

    还有更多例子:

    Example of per-Task uncontended dictionaries

    Example of per-Task database connections

    关于c# - 使用 Parallel ForEach 进行本地初始化如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14831255/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com