gpt4 book ai didi

c# - 使用 Reactive Extensions 观察传入的 websocket 消息?

转载 作者:太空狗 更新时间:2023-10-30 00:49:19 26 4
gpt4 key购买 nike

我想使用 linq 来处理通过 websocket 连接接收到的事件。这是我目前所拥有的:

    private static void Main()
{
string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
using (WebSocket ws = new WebSocket(WsEndpoint))
{
ws.OnMessage += Ws_OnMessage;

ws.Connect();
Console.ReadKey();
ws.Close();
}
}

private static void Ws_OnMessage(object sender, MessageEventArgs e)
{
Console.WriteLine(e.Data);
}

第一个让我感到困惑的想法是如何将 ws.OnMessage 变成某种事件流。我无法在网上找到任何示例来观察带有响应式扩展的外部 事件源。我打算将消息解析成json对象,然后过滤和聚合它们。

有人可以提供一个从 websocket 消息创建可观察对象并订阅它的示例吗?


编辑:最终工作代码

与所选答案的唯一区别是我在将 websocket 传递给 Observable.Using 之前初始化了它

//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);


//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
.Using(
() => socket,
ws =>
Observable
.FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------

IDisposable subscription = globalEventStream.Subscribe(ep =>
{
Console.WriteLine("Event Recieved");
Console.WriteLine(ep.EventArgs.Data);
});

//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();

最佳答案

您应该像这样设置您的可观察对象:

    var observable =
Observable
.Using(
() => new WebSocket(WsEndpoint),
ws =>
Observable
.FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));

这将正确地创建套接字,然后在订阅可观察对象时观察事件。处理订阅后,它将正确地与事件分离并处理套接字。


observable 的类型将是 IObservable<EventPattern<MessageEventArgs>> .您以这种方式使用此可观察对象:

IDisposable subscription = observable.Subscribe(ep =>
{
Console.WriteLine(ep.EventArgs.Data);
});

感谢发布 NuGet 引用。

这是工作代码:

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";

Console.WriteLine("Defining Observable:");

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable =
Observable
.Using(
() =>
{
var ws = new WebSocketSharp.WebSocket(WsEndpoint);
ws.Connect();
return ws;
},
ws =>
Observable
.FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));

Console.WriteLine("Subscribing to Observable:");

IDisposable subscription = observable.Subscribe(ep =>
{
Console.WriteLine("Event Recieved");
Console.WriteLine(ep.EventArgs.Data);
});

Console.WriteLine("Writing to Source:");

using (var source = new WebSocketSharp.WebSocket(WsEndpoint))
{
source.Connect();
source.Send("test");
}

关于c# - 使用 Reactive Extensions 观察传入的 websocket 消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39112800/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com