gpt4 book ai didi

c# - 从多个 IAsyncEnumerable 流并行接收数据

转载 作者:行者123 更新时间:2023-12-05 01:23:14 25 4
gpt4 key购买 nike

我有一种情况需要从多个 IAsyncEnumerable 源接收数据。为了提高性能,它应该以并行方式执行。

我已经使用 AsyncAwaitBestPractices 编写了这样的代码来实现这个目标, System.Threading.Tasks.DataflowSystem.Linq.Async nuget 包:

public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
this IEnumerable<IAsyncEnumerable<T>> sources,
int outputQueueCapacity = 1,
TaskScheduler scheduler = null)
{
var sourcesCount = sources.Count();

var channel = outputQueueCapacity > 0
? Channel.CreateBounded<T>(sourcesCount)
: Channel.CreateUnbounded<T>();

sources.AsyncParallelForEach(
async body =>
{
await foreach (var item in body)
{
await channel.Writer.WaitToWriteAsync();
await channel.Writer.WriteAsync(item);
}
},
maxDegreeOfParallelism: sourcesCount,
scheduler: scheduler)
.ContinueWith(_ => channel.Writer.Complete())
.SafeFireAndForget();

while (await channel.Reader.WaitToReadAsync())
yield return await channel.Reader.ReadAsync();
}

public static async Task AsyncParallelForEach<T>(
this IEnumerable<T> source,
Func<T, Task> body,
int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};

if (scheduler != null)
options.TaskScheduler = scheduler;

var block = new ActionBlock<T>(body, options);

foreach (var item in source)
block.Post(item);

block.Complete();

await block.Completion;
}

这段代码工作正常,直到两个或多个源抛出异常。在某些情况下,它会导致无法处理第二个异常并破坏应用程序的情况。

所以我想知道是否有更好的方法以并行方式使用来自多个 IAsyncEnumerable 源的数据?

最佳答案

无论是功能性管道还是 CSP 管道,在出现异常时保持管道运行都非常困难。在大多数情况下,即使在单个消息失败的情况下,管道也需要继续工作。一条失败消息并不意味着整个管道都失败了。

这就是为什么 Railway-oriented programming用于将消息 错误包装到 Result<TOk,TError> 中包装器和“重定向”或忽略错误消息。这样的类使数据流、 channel 和 IAsyncEnumerable 管道的编程变得很多更容易。

在 F# 中,使用有区别的联合,可以定义一个 Result type只是与

type Result<'T,'TError> =
| Ok of ResultValue:'T
| Error of ErrorValue:'TError

DU 还没有在 C# 中,因此已经提出了各种替代方案,其中一些使用从 IResult<> 继承。基础,一些使用允许详尽模式匹配的类/记录,这在 IResult<> 中是不可用的技术。

让我们假设 Result<>这是:

public record Result<T>(T? result, Exception? error)
{
public bool IsOk => error == null;
public static Result<T> Ok(T result) => new(result, default);
public static Result<T> Fail(Exception exception) =>
new(default, exception);

public static implicit operator Result<T> (T value)
=> Result<T>.Ok(value);
public static implicit operator Result<T>(Exception err)
=> Result<T>.Fail(err);
}

第一步是创建一个 CopyAsync将从输入复制所有数据的助手 IAsyncEnumerable<Result<T>>到输出 ChannelWriter<Result<T>>

public static async Task CopyToAsync<T>(
this IAsyncEnumerable<Result<T>> input,
ChannelWriter<Result<T>> output,
CancellationToken token=default)
{
try
{
await foreach(var msg in input.WithCancellationToken(token).ConfigureAwait(false))
{
await output.WriteAsync(msg).ConfigureAwait(false);
}
}
catch(Exception exc)
{
await output.WriteAsync(Result.Fail(exc)).ConfigureAwait(false);
}
}

这样,即使抛出异常,也会发出失败消息而不是中止管道。

这样,您可以通过将输入消息复制到输出 channel 来合并多个源:

public static ChannelReader<Result<T>> Merge(
this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
CancellationToken token=default)
{
var channel=Channel.CreateBounded<Result<T>>(1);

var tasks = inputs.Select(inp=>CopyToAsync(channel.Writer,token));

_ = Task.WhenAll(tasks)
.ContinueWith(t=>channel.Writer.TryComplete(t.Exception));

return channel.Reader;
}

使用 BoundedCapacity=1 保持下游 channel 或消费者的背压行为。

您可以通过 Channel.ReadAllAsync(CancellationToken) 读取 ChannelReader 中的所有消息:

IEnumerable<IAsyncEnumerable<Result<T>>> sources = ...;
var merged=sources.Merge();
await foreach(var msg in merged.ReadAllAsync())
{
//Pattern magic to get Good results only
if(msg is ({} value,null)
{
//Work with value
}
}

您可以通过返回 IAsyncEnumerable<> 来避免公开 channel :

public static IAsyncEnumerable<Result<T>> MergeAsync(
this IEnumerable<IAsyncEnumerable<Result<T>> inputs,
CancellationToken token=default)
{
return inputs.Merge(token).ReadAllAsync(token);
}

您可以使用 System.Linq.Async使用 LINQ 方法处理 IAsyncEnumerable<>,例如转换 IAsyncEnumerable<T>IAsyncEnumerable<Result<T>> :

source.Select(msg=>Result.Ok(msg))

或者在处理之前过滤失败的消息:

source.Where(msg=>msg.IsOk)

您可以创建一个应用 Func<T1,Task<T2>> 的方法到输入并传播结果或错误作为结果:

public async Task<Result<T2>> ApplyAsync<T1,T2>(this Result<T1> msg,
Func<T1,Task<T2>> func)
{
if (msg is (_, { } err))
{
return err;
}
try
{
var result = await func(msg.result).ConfigureAwait(false);
return result;
}
catch(Exception exc)
{
return exc;
}
}

这在 F# 中...有点...更容易

关于c# - 从多个 IAsyncEnumerable 流并行接收数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72711395/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com