gpt4 book ai didi

f# - 通过 Rx 从 MailboxProcessor 返回结果是个好主意吗?

转载 作者:行者123 更新时间:2023-12-04 19:00:17 25 4
gpt4 key购买 nike

我对下面的代码示例和人们的想法有点好奇。
这个想法是从 NetworkStream (~20 msg/s) 中读取数据,而不是在 main 中工作,而是将内容传递给 MainboxProcessor 以在完成后处理并取回内容以进行绑定(bind)。

通常的方法是使用 PostAndReply,但我想在 C# 中绑定(bind)到 ListView 或其他控件。无论如何,必须对 LastN 项和过滤做魔术。
另外,Rx 有一些错误处理。

下面的示例观察 2..10 中的数字并返回“hello X”。在 8 时,它像 EOF 一样停止。将其设置为 ToEnumerable 是因为其他线程在其他线程之前完成,但它也适用于订阅。

困扰我的是:

  • 以递归方式传递 Subject(obj)。我认为其中大约 3-4 个没有任何问题。好主意?
  • 对象的生命周期。

  • open System
    open System.Threading
    open System.Reactive.Subjects
    open System.Reactive.Linq // NuGet, take System.Reactive.Core also.
    open System.Reactive.Concurrency

    type SerializedLogger() =

    let _letters = new Subject<string>()
    // create the mailbox processor
    let agent = MailboxProcessor.Start(fun inbox ->

    // the message processing function
    let rec messageLoop (letters:Subject<string>) = async{

    // read a message
    let! msg = inbox.Receive()

    printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
    do! Async.Sleep 100
    // write it to the log
    match msg with
    | 8 -> letters.OnCompleted() // like EOF.
    | x -> letters.OnNext(sprintf "hello %d" x)

    // loop to top
    return! messageLoop letters
    }

    // start the loop
    messageLoop _letters
    )

    // public interface
    member this.Log msg = agent.Post msg
    member this.Getletters() = _letters.AsObservable()

    /// Print line with prefix 1.
    let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId

    // Actions
    let onNext = new Action<string>(myPrint1)
    let onCompleted = new Action(fun _ -> printfn "Complete")

    [<EntryPoint>]
    let main argv =
    async{
    printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId

    // test
    let logger = SerializedLogger()
    logger.Log 1 // ignored?

    let xObs = logger
    .Getletters() //.Where( fun x -> x <> "hello 5")
    .SubscribeOn(Scheduler.CurrentThread)
    .ObserveOn(Scheduler.CurrentThread)
    .ToEnumerable() // this
    //.Subscribe(onNext, onCompleted) // or with Dispose()

    [2..10] |> Seq.iter (logger.Log)

    xObs |> Seq.iter myPrint1

    while true
    do
    printfn "waiting"
    System.Threading.Thread.Sleep(1000)

    return 0
    } |> Async.RunSynchronously // return an integer exit code

    最佳答案

    我做过类似的事情,但使用的是普通的 F# Event输入而不是 Subject .它基本上可以让您创建 IObservable并触发其订阅 - 就像您使用更复杂的 Subject .基于事件的版本将是:

    type SerializedLogger() = 
    let letterProduced = new Event<string>()
    let lettersEnded = new Event<unit>()
    let agent = MailboxProcessor.Start(fun inbox ->
    let rec messageLoop (letters:Subject<string>) = async {
    // Some code omitted
    match msg with
    | 8 -> lettersEnded.Trigger()
    | x -> letterProduced.Trigger(sprintf "hello %d" x)
    // ...

    member this.Log msg = agent.Post msg
    member this.LetterProduced = letterProduced.Publish
    member this.LettersEnded = lettersEnded.Publish

    重要的区别是:
  • Event无法触发 OnCompleted ,所以我改为公开两个单独的事件。这是相当不幸的!鉴于 Subject在所有其他方面与事件非常相似,这可能是使用主题而不是普通事件的一个很好的理由。
  • 使用 Event 的好处是它是标准的 F# 类型,因此您不需要代理中的任何外部依赖项。
  • 我注意到您的评论指出第一次调用 Log被忽略了。那是因为您仅在此调用发生后才订阅事件处理程序。我想你可以使用 ReplaySubject variation on the Subject idea在这里 - 当您订阅它时,它会重播所有事件,因此之前发生的事件不会丢失(但缓存有成本)。

  • 总之,我认为使用 Subject可能是个好主意 - 它与使用 Event 的模式基本相同(我认为这是从代理公开通知的标准方式),但它可以让您触发 OnCompleted .我可能不会使用 ReplaySubject ,因为缓存成本 - 你只需要确保在触发任何事件之前订阅。

    关于f# - 通过 Rx 从 MailboxProcessor 返回结果是个好主意吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39028223/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com