gpt4 book ai didi

c# - 澄清与节流并行运行多个异步任务

转载 作者:行者123 更新时间:2023-12-04 15:46:19 24 4
gpt4 key购买 nike

编辑:由于 Bulkhead 策略需要用 WaitAndRetry 策略包装,无论如何......我倾向于将示例 3 作为保持并行性、节流和 polly 策略重试的最佳解决方案。看起来很奇怪,因为我认为 Parallel.ForEach 用于同步操作,而 Bulkhead 更适合异步

我正在尝试并行运行多个异步任务,同时使用 polly AsyncBulkheadPolicy 进行节流。到目前为止,我的理解是策略方法 ExecuteAsync 本身并不调用线程,而是将其留给默认的 TaskScheduler 或之前的某个人。因此,如果我的任务以某种方式受 CPU 限制,那么我需要在执行任务时使用 Parallel.ForEach 或使用 ExecuteAsync 方法使用 Task.Run() 以便将任务安排到后台线程。

有人可以查看下面的示例并阐明它们在并行性和线程池方面的工作方式吗?

https://github.com/App-vNext/Polly/wiki/Bulkhead - 操作:Bulkhead 策略不创建它自己的线程,它假定我们已经这样做了。

async Task DoSomething(IEnumerable<object> objects);

//Example 1:
//Simple use, but then I don't have access to retry policies from polly
Parallel.ForEach(groupedObjects, (set) =>
{
var task = DoSomething(set);
task.Wait();
});

//Example 2:
//Uses default TaskScheduler which may or may not run the tasks in parallel
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
var task = bulkheadPolicy.ExecuteAsync(async () => DoSomething(set));
parallelTasks.Add(task);
};

await Task.WhenAll(parallelTasks);

//Example 3:
//seems to defeat the purpose of the bulkhead since Parallel.ForEach and
//PolicyBulkheadAsync can both do throttling...just use basic RetryPolicy
//here?
Parallel.ForEach(groupedObjects, (set) =>
{
var task = bulkheadPolicy.ExecuteAsync(async () => DoSomething(set));
task.Wait();
});


//Example 4:
//Task.Run still uses the default Task scheduler and isn't any different than
//Example 2; just makes more tasks...this is my understanding.
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
var task = Task.Run(async () => await bulkheadPolicy.ExecuteAsync(async () => DoSomething(set)));
parallelTasks.Add(task);
};

await Task.WhenAll(parallelTasks);

DoSomething 是一种对一组对象执行操作的异步方法。我希望这在并行线程中发生,同时尊重 polly 的重试策略并允许节流。

我似乎对 Parallel.ForEach 和使用 Bulkhead.ExecuteAsync 的功能行为究竟有什么作用感到困惑,但是,当涉及到如何处理任务/线程时。

最佳答案

您可能是对的,使用 Parallel.ForEach 违背了隔板的目的。我认为一个带有延迟的简单循环将完成为舱壁提供任务的工作。虽然我猜想在现实生活中的例子中会有连续的数据流,而不是预定义的列表或数组。

using Polly;
using Polly.Bulkhead;

static async Task Main(string[] args)
{
var groupedObjects = Enumerable.Range(0, 10)
.Select(n => new object[] { n }); // Create 10 sets to work with
var bulkheadPolicy = Policy
.BulkheadAsync(3, 3); // maxParallelization, maxQueuingActions
var parallelTasks = new List<Task>();
foreach (var set in groupedObjects)
{
Console.WriteLine(@$"Scheduling, Available: {bulkheadPolicy
.BulkheadAvailableCount}, QueueAvailable: {bulkheadPolicy
.QueueAvailableCount}");

// Start the task
var task = bulkheadPolicy.ExecuteAsync(async () =>
{
// Await the task without capturing the context
await DoSomethingAsync(set).ConfigureAwait(false);
});
parallelTasks.Add(task);
await Task.Delay(50); // Interval between scheduling more tasks
}

var whenAllTasks = Task.WhenAll(parallelTasks);
try
{
// Await all the tasks (await throws only one of the exceptions)
await whenAllTasks;
}
catch when (whenAllTasks.IsFaulted) // It might also be canceled
{
// Ignore rejections, rethrow other exceptions
whenAllTasks.Exception.Handle(ex => ex is BulkheadRejectedException);
}
Console.WriteLine(@$"Processed: {parallelTasks
.Where(t => t.Status == TaskStatus.RanToCompletion).Count()}");
Console.WriteLine($"Faulted: {parallelTasks.Where(t => t.IsFaulted).Count()}");
}

static async Task DoSomethingAsync(IEnumerable<object> set)
{
// Pretend we are doing something with the set
await Task.Delay(500).ConfigureAwait(false);
}

输出:

Scheduling, Available: 3, QueueAvailable: 3
Scheduling, Available: 2, QueueAvailable: 3
Scheduling, Available: 1, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 3
Scheduling, Available: 0, QueueAvailable: 2
Scheduling, Available: 0, QueueAvailable: 1
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 0
Scheduling, Available: 0, QueueAvailable: 1
Processed: 7
Faulted: 3

Try it on Fiddle .


更新:DoSomethingAsync 的一个稍微更现实的版本,它实际上强制 CPU 做一些实际工作(在我的四核机器中 CPU 利用率接近 100%)。

private static async Task DoSomethingAsync(IEnumerable<object> objects)
{
await Task.Run(() =>
{
long sum = 0; for (int i = 0; i < 500000000; i++) sum += i;
}).ConfigureAwait(false);
}

此方法并未针对所有数据集运行。它仅针对未被舱壁拒绝的集合运行。

关于c# - 澄清与节流并行运行多个异步任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55622225/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com