gpt4 book ai didi

c# - 如何在 Rx.NET 中以正确的方式约束并发

转载 作者:太空狗 更新时间:2023-10-29 21:12:34 24 4
gpt4 key购买 nike

请注意以下代码片段:

var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();

此代码的问题在于 getResultAsync 以不受约束的方式并发运行。在某些情况下,这可能不是我们想要的。假设我想将其并发限制为最多 10 个并发调用。 Rx.NET 的方法是什么?

我附上了一个简单的控制台应用程序,它演示了主题和我对所描述问题的蹩脚解决方案。

还有一些额外的代码,比如 Stats 类和人工随机 sleep 。它们的存在是为了确保我真正获得并发执行,并且可以可靠地计算在该过程中达到的最大并发数。

RunUnconstrained 方法演示了原始的、不受约束的运行。 RunConstrained 方法显示了我的解决方案,它不是很优雅。理想情况下,我想通过简单地将专用 Rx 运算符应用于 Monad 来简化并发约束。当然,在不牺牲性能的情况下。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace RxConstrainedConcurrency
{
class Program
{
public class Stats
{
public int MaxConcurrentCount;
public int CurConcurrentCount;
public readonly object MaxConcurrentCountGuard = new object();
}

static void Main()
{
RunUnconstrained().GetAwaiter().GetResult();
RunConstrained().GetAwaiter().GetResult();
}
static async Task RunUnconstrained()
{
await Run(AsyncOp);
}
static async Task RunConstrained()
{
using (var sem = new SemaphoreSlim(10))
{
await Run(async (s, pause, stats) =>
{
// ReSharper disable AccessToDisposedClosure
await sem.WaitAsync();
try
{
return await AsyncOp(s, pause, stats);
}
finally
{
sem.Release();
}
// ReSharper restore AccessToDisposedClosure
});
}
}
static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
{
var stats = new Stats();
var rnd = new Random(0x1234);
var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
Debug.Assert(stats.CurConcurrentCount == 0);
Debug.Assert(result.Count == 1000);
Debug.Assert(!result.Contains(0));
Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
}

static IObservable<string> GetSource(int count)
{
return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
}

static Task<int> AsyncOp(string s, int pause, Stats stats)
{
return Task.Run(() =>
{
int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
if (stats.MaxConcurrentCount < cur)
{
lock (stats.MaxConcurrentCountGuard)
{
if (stats.MaxConcurrentCount < cur)
{
stats.MaxConcurrentCount = cur;
}
}
}

try
{
Thread.Sleep(pause);
return int.Parse(s);
}
finally
{
Interlocked.Decrement(ref stats.CurConcurrentCount);
}
});
}
}
}

最佳答案

您可以在 Rx 中使用 Merge 的重载来执行此操作,该重载限制了对内部可观察对象的并发订阅数。

这种形式的Merge应用于流的流。

通常,使用 SelectMany 从事件调用异步任务有两个作用:将每个事件转换到一个可观察流中,其单个事件是结果,并将所有结果流压平在一起。

要使用 Merge,我们必须使用常规的 Select 将每个事件转换到异步任务的调用中,(从而创建流的流),并使用 Merge 将结果展平。它将通过在任何时间点仅订阅提供的固定数量的内部流以受限的方式执行此操作。

我们必须小心,仅在订阅包装内部流时调用每个异步任务调用。使用 ToObservable() 将异步任务转换为可观察对象实际上会立即调用异步任务,而不是在订阅时调用,因此我们必须延迟评估直到订阅使用 Observable.Defer.

这是一个将所有这些步骤放在一起的示例:

void Main()
{
var xs = Observable.Range(0, 10); // source events

// "Double" here is our async operation to be constrained,
// in this case to 3 concurrent invocations

xs.Select(x =>
Observable.Defer(() => Double(x).ToObservable())).Merge(3)
.Subscribe(Console.WriteLine,
() => Console.WriteLine("Max: " + MaxConcurrent));


}

private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();

public async Task<int> Double(int x)
{
var concurrent = Interlocked.Increment(ref Concurrent);
lock(gate)
{
MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
}

await Task.Delay(TimeSpan.FromSeconds(1));

Interlocked.Decrement(ref Concurrent);

return x * 2;
}

此处的最大并发输出将为“3”。删除 Merge 以“不受约束”,您将得到“10”。

获得 Defer 效果的另一种(等效)方法是使用 FromAsync 而不是 Defer + ToObservable:

xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)

关于c# - 如何在 Rx.NET 中以正确的方式约束并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29263695/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com