gpt4 book ai didi

c# - 读取可观察的 TAP 模式

转载 作者:行者123 更新时间:2023-12-02 21:47:01 25 4
gpt4 key购买 nike

经过对 StackOverflow 的大量努力和研究——其中大部分已经过时,因为响应式扩展代码最近发生了变化——我终于能够消除这个从套接字读取数据的 Observable 方法的所有编译错误,并且我对这段代码的理解比一开始要好得多。但并不完全。有人可以用英语读给我听并回答两三个问题吗?

缓冲的数据是从这个方法中提取的(或者应该如何提取,如果我错了)?其中是否有不再需要的部分?尽管我真的很喜欢与业务代码解耦,并将所有套接字代码保留在一种或两种方法中,但是否有更好的方法来做到这一点(解耦且可读)?

    public static IObservable<int> WhenDataReceived(this Socket socket, int byteCount, SocketFlags flags = SocketFlags.None)
{
Contract.Requires(byteCount > 0);

return Observable.Create<int>(
observer =>
{
byte[] buffer = new byte[byteCount];
int remainder = byteCount;
bool shutdown = false;

return Observable.Defer<int>(() =>
Task.Factory.FromAsync<int>(socket.BeginReceive(buffer, buffer.Length - remainder, remainder, flags,
(result) =>
{
var read = (int)result.AsyncState;
remainder -= read;

if (read == 0)
shutdown = true;
},
null), socket.EndReceive).ToObservable())
.TakeWhile(_ => remainder > 0 && !shutdown)
.TakeLast(1)
.Subscribe(
observer.OnNext,
ex =>
{
var socketError = ex as SocketException;

if (socketError != null
&& (socketError.SocketErrorCode == SocketError.Disconnecting
|| socketError.SocketErrorCode == SocketError.Shutdown))
{
observer.OnCompleted();
}
else { observer.OnError(ex); }
},
observer.OnCompleted);
});
}
}

调用它的函数仍然存在我不明白的编译错误(.Do 和 .BitConverter 有一些无效参数):

        static IObservable<string> StartClient(this IObserver<ScanInformation> observer, IPAddress ip, int port)
{
var client = Observable.Using(
() => new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp),
socket =>
from _ in socket.WhenConnected(ip, port)
from message in
(from first in socket.WhenDataReceived(4)
let length = BitConverter.ToInt32(first, 0)
from message in
Observable.If(
condition: () => length > 0,
thenSource: from second in socket.WhenDataReceived(length)
select Encoding.UTF8.GetString(second, 0, length),
elseSource: Observable.Return<string>(null))
select message)
.Repeat()
.TakeWhile(message => message != null)
select message);

return
client.Do(observer).TakeLast(1);
}

最佳答案

这两个编译错误都是由于传递了错误键入的参数造成的。

Do() 上的编译错误是因为您的观察者是 IObserver<ScanInformation>但客户是IObservable<string> 。您是否打算将字符串转换为 ScanInformation 的实例?

您在 BitConverter 上的编译接受 byte[]作为第一个参数(要转换的字节缓冲区),但您传递的是 int大概在某个时刻,您从 WhenDataReceived 返回缓冲区;现在您正在传回读取的字节数。

Rx 并没有发生太大的变化,以至于这种代码会被破坏。您的代码看起来可能遇到了一些复制/粘贴错误 - 以至于它可能更令人困惑而不是有帮助。有一个look at this blog post一个看起来合理的套接字读取实现,它使用 Rx 以相当简单的方式包装 TPL 调用。 This discussion或许也能有所启发。

还有一个还不错的ObservableSocket在 Rxx 库中。请参阅here .

关于c# - 读取可观察的 TAP 模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19352944/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com