gpt4 book ai didi

f# - 如何从 F# 中的事件设置多个可观察对象?

转载 作者:行者123 更新时间:2023-12-01 01:16:29 26 4
gpt4 key购买 nike

我正在尝试通过编写一个程序来了解 F# 中的 Observable 模块,该程序连接到 Web 套接字,监听消息,然后在一组基于 Observables 的流中处理它们。但是,我很难理解实际行为。

首先,我设置了一个这样的网络套接字:

open System
open System.Net.WebSockets
open System.Threading

let connectFeed =
let feedUrl = "blah blah"
let buffer : byte array = Array.zeroCreate 1024
let segment = ArraySegment(buffer)
let socketEvent = new Event<string>()

let task = async {
let random = Random(DateTime.Now.Millisecond)
use socket = new ClientWebSocket()
let! token = Async.CancellationToken
do! Async.AwaitTask (socket.ConnectAsync(Uri(feedUrl), token))

while not token.IsCancellationRequested do
let! result = Async.AwaitTask (socket.ReceiveAsync(segment, token))
socketEvent.Trigger (Encoding.UTF8.GetString(buffer))
Array.fill buffer 0 buffer.Length 0uy

}

(task, socketEvent.Publish)

let deserializeMsg (raw:string) =
// returns a MsgType based on the received message

let tryGetData (msg:MsgType) =
// returns Some data for specific kind of message; None otherwise

[<EntryPoint>]
let main argv =
let feedProc, feedStream = connectFeed
let msgStream = feedStream |> Observable.map deserializeMsg

msgStream |> Observable.subscribe (fun m -> printfn "got msg: %A" m) |> ignore

let dataStream = feedStream |> Observable.choose tryGetData
dataStream |> Observable.subscribe (fun d -> printfn "got data: %A" d) |> ignore

Async.RunSynchronously feedProc
0

我期待看到像这样的打印输出:
got msg: { some: "field" }
got msg: { some: "other" }
got msg: { some: "data" }
got data: { // whatever }
got msg: ...
...

相反,即使有些消息会导致 tryGetData,也只会出现“got msg”消息。返回 Some .

这里发生了什么?如何设置多个 Observable来自单个事件的流?

更新 :我已经用这个更新了我的代码:
let isMsgA msg =
printfn "isMsgA"
match msg with
| MsgA -> true // where MsgA is a member of a DU defined elsewhere, and is the result of deserializeMsg
| _ -> false

let isStringMsgA msg =
printfn "isStringMsgA"
if msg.StartsWith("{ \"type\": \"msga\"") then true else false

[<EntryPoint>]
let main argv =
let feedProc, feedStream = connectFeed
let msgStream = feedStream |> Observable.map deserializeMsg

msgStream
|> Observable.filter isMsgA
|> Observable.subscribe (fun m -> printfn "got msg MsgA")
|> ignore

feedStream
|> Observable.filter isStringMsgA
|> Observable.subscribe (fun m -> printfn "got string MsgA")
|> ignore

我得到一个充满“isStringMsgA”和“got string MsgA”消息的屏幕,但“isMsgA”和“got msg MsgA”各有一个。

我很困惑。

对于任何有兴趣摆弄它的人来说,这是一个精简的、可重复的示例:
https://github.com/aggieben/test-observable

更新 2 : 看起来我可能会看到这种行为,因为 deserializeMsg 中抛出了异常。功能。还在挖...

最佳答案

我看不出发生这种情况的任何明显原因 - 你能在 tryGetData 中添加一些日志吗?检查它得到什么输入以及它返回什么结果?

使用 Observable 时模块,您构建处理管道的描述和Observable.subscribe创建一个具体的监听器链来完成工作并将处理程序附加到主要事件源。然而,这些事件并没有被“消费”——它们应该被发送给所有的观察者。

例如,尝试使用以下最小演示:

let evt = Event<int>()

let e1 = evt.Publish |> Observable.choose (fun n ->
if n % 2 = 0 then Some "woop!" else None)
let e2 = evt.Publish |> Observable.map (fun n -> n * 10)

e1 |> Observable.subscribe (printfn "E1: %s")
e2 |> Observable.subscribe (printfn "E2: %d")

evt.Trigger(1)
evt.Trigger(2)

如果你运行它,它会打印出预期的结果:
E2: 10
E1: woop!
E2: 20

关于f# - 如何从 F# 中的事件设置多个可观察对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44558564/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com