gpt4 book ai didi

c# - 将可观察字节数组转换为对象

转载 作者:太空宇宙 更新时间:2023-11-03 12:55:46 25 4
gpt4 key购买 nike

我第一次在一个项目中使用 Reactive,我遇到了一个性能非常重要的问题。

概述:

我正在通过 TCP 套接字检索大量数据,我必须将这些数据解析为对象并插入到数据库中。每条消息都具有以下签名:

<payload-size> <payload>

其中 size 是 uint32 (4kb),它以字节为单位描述了以下有效负载的大小。

问题:

我想使用 Reactive Framework 提供的功能来并行执行以下步骤(见下文)以最大限度地提高性能并避免成为瓶颈。此外,我要求提供实现此方法的“最佳实践”。

TCP Socket ---> Observable (ArraySegment<byte>) --> Observable (Message)

我已经实现了以下代码,它为我提供了 Observable (ArraySegment<byte>) .

IObservable<TcpClient> observableTcpClient = endPoint.ListenerObservable(1);
IObservable<ArraySegment<byte>> observableSocket = observableTcpClient
.SelectMany(client => client.ToClientObservable(bufferSize));

我现在想转换 Observable (ArraySegment<byte>)Observable (Message) .我的第一个解决方案看起来有点像这样,因为我可以像流一样使用可观察对象。

Read continous bytestream from Stream using TcpClient and Reactive Extensions

问题:

是否可以(以及如何)使用以下方法创建可观察对象?或者有没有更好的方法可以推荐?我真的很感激一个很好的例子。

注意:Observable (ArraySegment) 的行为就像一个流,所以我不知道它推送给我的数据大小。 (我是否需要实现某种缓冲区,或者 Reactive Framework 可以帮助我吗?)

    Observable (ArraySegment<byte>) 
--> Buffer(4kb)
--> ReadSize --> Buffer(payload-size)
--> ReadPayload
--> Parse Payload
--> (Start over)

提前致谢! :)

最佳答案

编辑:在 Dimitri 的评论之后,我在下面提出了一个修改后的解决方案。有一行需要拼命重构,但它似乎有效..

Window 重载被使用,因此我们可以编写自定义缓冲。

var hinge = new Subject<Unit>();

observableSocket
.SelectMany(i => i) // to IObservable<byte>
.Window(() => hinge) // kinda-like-buffer
.Select(buff =>
{
return
from size in buff.Buffer(SIZEOFUINT32).Select(ConvertToUINT32)
from payload in buff.Buffer(size)
//Refactor line below! Window must be closed somehow..
from foo in Observable.Return(Unit.Default).Do( _ => hinge.OnNext(Unit.Default))
select payload;
})
.SelectMany(i=>i)
.ObserveOn(ThreadPoolScheduler.Instance)
.Select(ConvertToMessage);

编辑 2: 删除旧解决方案

关于c# - 将可观察字节数组转换为对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33945210/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com