gpt4 book ai didi

c# - C# 中的循环异步任务列表

转载 作者:行者123 更新时间:2023-11-30 22:05:54 25 4
gpt4 key购买 nike

我正在尝试连续解析来自多个网站的数据。我希望此操作以异步方式在循环中单独执行,直到程序关闭。我不确定这种逻辑的结构应该是什么。

现在我正在遵循这种模式。

async public void ParseAll(List<Site> SiteList)
{
List<Task> TaskList = new List<Task>();

foreach(Site s in SiteList)
{
TaskList.Add(s.ParseData);
}

await Task.WhenAll(TaskList)
}

问题是,如果我围绕此方法构造一个循环,那么首先更新的站点将必须等到整个列表完成后,该方法才能再次运行。从理论上讲,我想做的只是在每个站点完成其 ParseData 方法时将每个站点放回 TaskList 的底部,但我不确定这是否可能,或者如果那是最好的方法。

最佳答案

Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData

看起来您需要维护一个要处理的站点队列。下面是我对此的看法,使用 SemaphoreSlim。通过这种方式,您还可以将并发任务数限制为小于实际站点数,或者即时添加新站点。 CancellationToken 用于从外部停止处理。使用 async void 在这里 IMO 是合理的,QueueSiteAsync 跟踪它启动的任务。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncLoop
{
class Program
{
public class Site
{
public string Url { get; set; }
public async Task ParseDataAsync(CancellationToken token)
{
// simulate download and parse
int delay = new Random(Environment.TickCount).Next(100, 1000);
await Task.Delay(delay, token);
Console.WriteLine("Processed: #{0}, delay: {1}", this.Url, delay);
}
}

object _lock = new Object();
HashSet<Task> _pending = new HashSet<Task>(); // sites in progress
SemaphoreSlim _semaphore;

async void QueueSiteAsync(Site site, CancellationToken token)
{
Func<Task> processSiteAsync = async () =>
{
await _semaphore.WaitAsync(token).ConfigureAwait(false);
try
{
await site.ParseDataAsync(token);
QueueSiteAsync(site, token);
}
finally
{
_semaphore.Release();
}
};

var task = processSiteAsync();
lock (_lock)
_pending.Add(task);
try
{
await task;
lock (_lock)
_pending.Remove(task);
}
catch
{
if (!task.IsCanceled && !task.IsFaulted)
throw; // non-task error, re-throw

// leave the faulted task in the pending list and exit
// ProcessAllSites will pick it up
}
}

public async Task ProcessAllSites(
Site[] sites, int maxParallel, CancellationToken token)
{
_semaphore = new SemaphoreSlim(Math.Min(sites.Length, maxParallel));

// start all sites
foreach (var site in sites)
QueueSiteAsync(site, token);

// wait for cancellation
try
{
await Task.Delay(Timeout.Infinite, token);
}
catch (OperationCanceledException)
{
}

// wait for pending tasks
Task[] tasks;
lock (_lock)
tasks = _pending.ToArray();
await Task.WhenAll(tasks);
}

// testing
static void Main(string[] args)
{
// cancel processing in 10s
var cts = new CancellationTokenSource(millisecondsDelay: 10000);
var sites = Enumerable.Range(0, count: 10).Select(i =>
new Site { Url = i.ToString() });
try
{
new Program().ProcessAllSites(
sites.ToArray(),
maxParallel: 5,
token: cts.Token).Wait();
}
catch (AggregateException ex)
{
foreach (var innerEx in ex.InnerExceptions)
Console.WriteLine(innerEx.Message);
}
}
}
}

您可能还想将下载和解析分开到单独的管道中,检查 this了解更多详情。

关于c# - C# 中的循环异步任务列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24020740/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com