gpt4 book ai didi

c# - 异步处理 IEnumerable,并发性有限

转载 作者:太空狗 更新时间:2023-10-30 01:17:39 26 4
gpt4 key购买 nike

我有一个 IEnumerable<Task<T>>其中 T表示一些事件(自然语言类型的事件,而不是 event 类型的事件)。

我想异步处理这些,因为它们是 IO 绑定(bind)的,并限制并发量,因为处理事件的数据库不能处理超过一把(比如 6 个)并发处理请求(它们非常重)这样做的正确策略是什么?

如果我有

private Task processeventasync(T someevent) {
...
}

foreach(t in tasks) {
await processeventsasync(await t)
}

我没有并发。

如果我用信号量保护事物,我实际上是在保护线程并用锁保护它们,而不是异步等待它们。

LimitedConcurrencyLevelTaskScheduler来自 https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler(v=vs.110).aspx 上的示例也是一种基于线程/锁的方法

我考虑过维护最多 6 个任务的队列,并创建一个 WhenAny绕着它转,但感觉就像重新发明方轮。

private List<Task> running = new List<Task>();

foreach(Task<T> task in tasks) {
var inner = TaskExtensions.Unwrap(t.ContinueWith(tt => processeventasync(tt.Result)));
running.Add(inner);
if (running.Count >= 6) {
var resulttask = await Task.WhenAny(running);
running.Remove(resulttask);
await resulttask;
//not sure if this await will schedule the next iteration
//of the loop asynchronously, or if the loop happily continues
//and the continuation has the rest of the loop body (nothing
}
}

去这里的正确方法是什么?

编辑:

SemaphoreSlim s WaitAsync这似乎很合理。我来到以下看起来很奇怪的代码:

    private async void Foo()
{

IEnumerable<Task<int>> tasks = gettasks();
var resulttasks = tasks.Select(ti => TaskExtensions.Unwrap(ti.ContinueWith(tt => processeventasync(tt.Result))));
var semaphore = new SemaphoreSlim(initialCount: 6);

foreach (Task task in resulttasks)
{
await semaphore.WaitAsync();
semaphore.Release();
}
}

拥有async void这里比较臭,但是死循环;它永远不会返回(实际处理显然会有一些取消机制)。

只有正文中的 await/release 看起来很奇怪,但看起来确实是这样。这是没有隐藏陷阱的合理方法吗?

最佳答案

您可以使用 SemaphoreSlim.WaitAsync 限制并发.

It looks really strange with just the await/release in the body, but it looks like that's actually right

您当前的方法实际上没有任何作用。这些任务根本不受 SemaphoreSlim 的影响,因为您使用 Enumerable.Select 同时调用它们。

您需要监控 Select 中的信号量:

private const int ConcurrencyLimit = 6;
SemaphoreSlim semaphoreSlim = new SemaphoreSlim(ConcurrencyLimit);

public async Task FooAsync()
{
var tasks = GetTasks();
var sentTasks = tasks.Select(async task =>
{
await semaphoreSlim.WaitAsync();
try
{
await ProcessEventAsync(await task);
}
finally
{
semaphoreSlim.Release();
}
});

await Task.WhenAll(sentTasks);
}

private Task ProcessEventAsync(T someEvent)
{
// Process event.
}

关于c# - 异步处理 IEnumerable<Task>,并发性有限,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30863863/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com