gpt4 book ai didi

c# - 使用 Reactive Extensions (Rx) 进行套接字编程实用吗?

转载 作者:IT王子 更新时间:2023-10-29 04:39:35 26 4
gpt4 key购买 nike

用 Rx 编写 GetMessages 函数最简洁的方法是什么:

static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));

Console.ReadKey();
}

static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it's length.
// the rest of the message is a string

// ????????????? Now What ?????????????
}

作为上述示例驱动程序的简单服务器:http://gist.github.com/452893#file_program.cs

关于使用Rx进行套接字编程

我一直在调查使用 Reactive Extensions对于我正在做的一些套接字编程工作。我这样做的动机是它会以某种方式使代码“更简单”。这是否意味着更少的代码,更少的嵌套。

然而到目前为止,情况似乎并非如此:

  1. 我还没有找到很多使用 Rx 和套接字的例子
  2. example s 我发现似乎并不比我现有的 BeginXXXX、EndXXXX 代码复杂
  3. 虽然 ObservableFromAsyncPattern 的扩展方法,这不包括 SocketEventArgs 异步 API。

当前无效的解决方案

这是我目前所拥有的。这不起作用,它因堆栈溢出而失败(呵呵)我还没有弄清楚语义,所以我可以创建一个 IObservable 来读取指定数量的字节。

    static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();

var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}

最佳答案

按照这些思路可能会奏效。这没有经过测试,没有考虑异常和部分返回消息的情况。但除此之外,我相信这是一个正确的方向。

    public static IObservable<T> GetSocketData<T>(this Socket socket,
int sizeToRead, Func<byte[], T> valueExtractor)
{
return Observable.CreateWithDisposable<T>(observer =>
{
var readSize = Observable
.FromAsyncPattern<byte[], int, int, SocketFlags, int>(
socket.BeginReceive,
socket.EndReceive);
var buffer = new byte[sizeToRead];
return readSize(buffer, 0, sizeToRead, SocketFlags.None)
.Subscribe(
x => observer.OnNext(valueExtractor(buffer)),
observer.OnError,
observer.OnCompleted);
});
}

public static IObservable<int> GetMessageSize(this Socket socket)
{
return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
}

public static IObservable<string> GetMessageBody(this Socket socket,
int messageSize)
{
return socket.GetSocketData(messageSize, buf =>
Encoding.UTF8.GetString(buf, 0, messageSize));
}

public static IObservable<string> GetMessage(this Socket socket)
{

return
from size in socket.GetMessageSize()
from message in Observable.If(() => size != 0,
socket.GetMessageBody(size),
Observable.Return<string>(null))
select message;
}

public static IObservable<string> GetMessagesFromConnected(
this Socket socket)
{
return socket
.GetMessage()
.Repeat()
.TakeWhile(msg => !string.IsNullOrEmpty(msg));
}

public static IObservable<string> GetMessages(this Socket socket,
IPAddress addr, int port)
{
return Observable.Defer(() =>
{
var whenConnect = Observable
.FromAsyncPattern<IPAddress, int>(
socket.BeginConnect, socket.EndConnect);
return from _ in whenConnect(addr, port)
from msg in socket.GetMessagesFromConnected()
.Finally(socket.Close)
select msg;
});
}

编辑:为了处理不完整的读取,可以使用 Observable.While(在 GetSockedData 中),正如 Dave Sexton 在 same thread on RX forum 中所建议的那样.

编辑:另外,请看一下 Jeffrey Van Gogh 的这篇文章:Asynchronous System.IO.Stream reading

关于c# - 使用 Reactive Extensions (Rx) 进行套接字编程实用吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3118289/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com