gpt4 book ai didi

c# - 使用 System.Text.Json 异步反序列化列表

转载 作者:行者123 更新时间:2023-12-02 00:13:29 28 4
gpt4 key购买 nike

假设我请求一个包含许多对象列表的大型 json 文件。我不希望它们一下子都在内存中,但我宁愿一个一个地阅读和处理它们。所以我需要转一个异步 System.IO.Stream流入 IAsyncEnumerable<T> .如何使用新版 System.Text.Json API来做到这一点?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}

最佳答案

TL;DR 这不是微不足道的

看起来有人已经 posted full codeUtf8JsonStreamReader 结构,它从流中读取缓冲区并将它们提供给 Utf8JsonRreader,允许使用 JsonSerializer.Deserialize<T>(ref newJsonReader, options); 轻松反序列化。代码也不是微不足道的。相关问题是 here ,答案是 here

但这还不够 - HttpClient.GetAsync 只有在收到整个响应后才会返回,本质上是在内存中缓冲所有内容。

为了避免这种情况,HttpClient.GetAsync(string,HttpCompletionOption ) 应该与 HttpCompletionOption.ResponseHeadersRead 一起使用。

反序列化循环也应该检查取消 token ,如果有信号则退出或抛出。否则循环将继续,直到整个流被接收和处理。

此代码基于相关答案的示例,并使用 HttpCompletionOption.ResponseHeadersRead 并检查取消标记。它可以解析包含适当项目数组的 JSON 字符串,例如:

[{"prop1":123},{"prop1":234}]

第一次调用 jsonStreamReader.Read() 移动到数组的开头,而第二次调用移动到第一个对象的开头。当检测到数组的结尾 ( ] ) 时,循环本身终止。
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object

while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}

// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;

// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}

JSON 片段,AKA 流 JSON aka ...*

在事件流或日志记录场景中,将单个 JSON 对象附加到文件中是很常见的,每行一个元素,例如:
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

这不是有效的 JSON 文档,但各个片段是有效的。这对于大数据/高并发场景有几个优势。添加新事件只需要在文件中追加一个新行,而不需要解析和重建整个文件。处理,尤其是并行处理更容易,原因有二:
  • 可以一次检索单个元素,只需从流中读取一行即可。
  • 输入文件可以很容易地跨行边界进行分区和拆分,将每个部分提供给单独的工作进程,例如在 Hadoop 集群中,或者只是应用程序中的不同线程:计算拆分点,例如通过将长度除以数量 worker ,然后寻找第一个换行符。将到那时为止的所有内容都提供给单独的 worker 。

  • 使用 StreamReader

    执行此分配的方法是使用 TextReader,一次读取一行并使用 JsonSerializer.Deserialize 解析它:
    using var reader=new StreamReader(stream);
    string line;
    //ReadLineAsync() doesn't accept a CancellationToken
    while((line=await reader.ReadLineAsync()) != null)
    {
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
    return;
    }
    }

    这比反序列化正确数组的代码简单得多。有两个问题:
  • ReadLineAsync 不接受取消 token
  • 每次迭代都会分配一个新字符串,这是我们希望通过使用 System.Text.Json 避免的事情之一

  • 这可能已经足够了,因为尝试生成 JsonSerializer.Deserialize 所需的 ReadOnlySpan<Byte> 缓冲区并非易事。

    管道和 SequenceReader

    为了避免分配,我们需要从流中获取 ReadOnlySpan<byte>。这样做需要使用 System.IO.Pipeline 管道和 SequenceReader 结构。 Steve Gordon 的 An Introduction to SequenceReader 解释了如何使用这个类使用分隔符从流中读取数据。

    不幸的是, SequenceReader 是一个 ref 结构,这意味着它不能用于异步或本地方法。这就是为什么史蒂夫戈登在他的文章中创建了一个
    private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

    读取项的方法形成 ReadOnlySequence 并返回结束位置,因此 PipeReader 可以从中恢复。不幸的是,我们想要返回一个 IEnumerable 或 IAsyncEnumerable,并且迭代器方法也不喜欢 inout 参数。

    我们可以在 List 或 Queue 中收集反序列化的项目并将它们作为单个结果返回,但这仍然会分配列表、缓冲区或节点,并且必须等待缓冲区中的所有项目在返回之前被反序列化:
    private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

    我们需要一些像可枚举一样的东西,不需要迭代器方法,使用异步并且不缓冲所有东西。

    添加 channel 以生成 IAsyncEnumerable

    ChannelReader.ReadAllAsync 返回一个 IAsyncEnumerable。我们可以从不能作为迭代器工作的方法返回一个 ChannelReader 并且仍然产生一个没有缓存的元素流。

    调整 Steve Gordon 的代码以使用 channel ,我们得到 ReadItems(ChannelWriter...) 和 ReadLastItem 方法。第一个,一次读取一个项目,直到使用 ReadOnlySpan<byte> itemBytes 换行。这可以由 JsonSerializer.Deserialize 使用。如果 ReadItems 找不到分隔符,它将返回其位置,以便 PipelineReader 可以从流中提取下一个块。

    当我们到达最后一个块并且没有其他分隔符时, ReadLastItem` 读取剩余的字节并反序列化它们。

    代码几乎与 Steve Gordon 的相同。我们不是写入控制台,而是写入 ChannelWriter。
    private const byte NL=(byte)'\n';
    private const int MaxStackLength = 128;

    private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
    bool isCompleted, CancellationToken token)
    {
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
    if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
    {
    var item=JsonSerializer.Deserialize<T>(itemBytes);
    writer.TryWrite(item);
    }
    else if (isCompleted) // read last item which has no final delimiter
    {
    var item = ReadLastItem<T>(sequence.Slice(reader.Position));
    writer.TryWrite(item);
    reader.Advance(sequence.Length); // advance reader to the end
    }
    else // no more items in this sequence
    {
    break;
    }
    }

    return reader.Position;
    }

    private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
    {
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
    Span<byte> byteBuffer = stackalloc byte[length];
    sequence.CopyTo(byteBuffer);
    var item=JsonSerializer.Deserialize<T>(byteBuffer);
    return item;
    }
    else // otherwise we'll rent an array to use as the buffer
    {
    var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

    try
    {
    sequence.CopyTo(byteBuffer);
    var item=JsonSerializer.Deserialize<T>(byteBuffer);
    return item;
    }
    finally
    {
    ArrayPool<byte>.Shared.Return(byteBuffer);
    }

    }
    }
    DeserializeToChannel<T> 方法在流的顶部创建一个管道读取器,创建一个 channel 并启动一个工作任务来解析块并将它们推送到 channel :
    ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
    {
    var pipeReader = PipeReader.Create(stream);
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
    while (!token.IsCancellationRequested)
    {
    var result = await pipeReader.ReadAsync(token); // read from the pipe

    var buffer = result.Buffer;

    var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

    if (result.IsCompleted)
    break; // exit if we've read everything from the pipe

    pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
    }

    pipeReader.Complete();
    },token)
    .ContinueWith(t=>{
    pipeReader.Complete();
    writer.TryComplete(t.Exception);
    });

    return channel.Reader;
    }
    ChannelReader.ReceiveAllAsync() 可用于通过 IAsyncEnumerable<T> 消费所有项目:
    var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
    await foreach(var item in reader.ReadAllAsync(cts.Token))
    {
    //Do something with it
    }

    关于c# - 使用 System.Text.Json 异步反序列化列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58572524/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com