gpt4 book ai didi

c# - 当列表可以附加其他任务时等待 Task.WhenAny(List) 的适当模式

转载 作者:行者123 更新时间:2023-12-02 02:01:55 25 4
gpt4 key购买 nike

不可能等待 List<Task>这正在改变,因为 Task.WhenAny(List<Task>)获取 List<Task> 的副本.

什么是合适的模式

List<Task> taskList = new List<Task>();

await Task.WhenAny(taskList);

在调用第一个 WhenAny 后,taskList 何时可以添加其他任务?

下面的完整演示代码演示了该问题。

    static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);

static async Task<int> RunTaskAsync(int taskID,int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration); // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker

taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
static async Task Main(string[] args)
{
Stopwatch sw = new Stopwatch(); sw.Start();

// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);

// while there are tasks to complete use the main thread to process them as they comeplete
while(taskList.Count > 0)
{
var t = await Task.WhenAny(taskList);
taskList.Remove(t);
var i = await t;
Console.WriteLine("Task {0} found to be completed at: {1}",i,sw.Elapsed);
}

// All tasks have completed sucessfully - exit main thread
}

控制台输出,显示 WhenAny() 循环仅在找到并删除 60 秒任务后才发现所有其他任务均已完成。

Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Starting Task: 3 with a duration of 24 seconds
Starting Task: 4 with a duration of 15 seconds
Starting Task: 5 with a duration of 28 seconds
Starting Task: 6 with a duration of 21 seconds
Starting Task: 7 with a duration of 11 seconds
Starting Task: 8 with a duration of 29 seconds
Starting Task: 9 with a duration of 21 seconds
Starting Task: 10 with a duration of 20 seconds
Task 1 found to be completed at: 00:01:00.1305811
Task 2 found to be completed at: 00:01:00.1312951
Task 3 found to be completed at: 00:01:00.1315689
Task 4 found to be completed at: 00:01:00.1317623
Task 5 found to be completed at: 00:01:00.1319427
Task 6 found to be completed at: 00:01:00.1321225
Task 7 found to be completed at: 00:01:00.1323002
Task 8 found to be completed at: 00:01:00.1324379
Task 9 found to be completed at: 00:01:00.1325962
Task 10 found to be completed at: 00:01:00.1327377

谢谢!

最佳答案

您所显示的代码存在问题,即它在工作人员和任务创建者之间没有合理的通信管道。您需要某种消息传递机制来通知工作人员有关新任务的信息(以及当没有更多任务时),以便它可以对其使用react。这是你必须为你的并发系统弄清楚的事情,而确切的实现与问题无关,所以我假设我们有 OnTaskAdded(Task task)OnEnd()我们的工作人员中的方法。

根据您的说法,您不想真正等到任何任务完成,而是在每个任务完成时执行一些操作。请参阅下面更新的答案。<罢工>这可以通过 ContinueWith 来实现:

<罢工>
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();

// Start the stopwatch in the constructor or in some kind of a StartProcessing method.

void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = task.ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
_tasks.Add(taskWithContinuation);
}

async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
}

编辑:在进行了所有关于确保合理的消息传递管道的说教之后,我认为我实际上可以为您提供一个快速而肮脏的实现,以便您可以看到它的工作原理:

// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
Stopwatch sw = new Stopwatch(); sw.Start();

// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();

// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
var taskWithContinuation = taskList[i].ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
internalList.Add(taskWithContinuation);
++i;
}
await Task.WhenAll(internalList);
}

让我再次强调:这不是生产质量的代码!积极等待更多任务,呃。它的输出是这样的:

Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695

您可以看到,由于多线程工作的性质,行有点困惑,但时间戳是准确的。

更新:

好吧,我很蠢,我只是邀请你进入一个反模式。 Using ContinueWith is dangerous ,而且它过于复杂 - async/ await的引入是为了让我们免于手动安排延续。 你可以直接包裹你的Task<int>操作 await保存并记录时间

class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();

// Start the stopwatch in the constructor or in some kind of a StartProcessing method.

void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = ContinueWithLog(task);
_tasks.Add(taskWithContinuation);
}

async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}

private Task ContinueWithLog(Task<int> task)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
}

使用示例代码进行快速而肮脏的 PoC:

class Program
{
static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static readonly Stopwatch sw = new Stopwatch();

static async Task<int> RunTaskAsync(int taskID, int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration); // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker

taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}

public static async Task ContinueWithLog(Task<int> source)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}

public static async Task Main()
{
sw.Start();

// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();

// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
internalList.Add(ContinueWithLog(taskList[i]));
++i;
}
await Task.WhenAll(internalList);
}
}

输出:

Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952

这是实现您想要的目标的惯用方式。很抱歉用 ContinueWith 误导您首先,这是不必要的,而且容易出错,现在我们都知道了。

关于c# - 当列表可以附加其他任务时等待 Task.WhenAny(List<T>) 的适当模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59449417/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com