gpt4 book ai didi

asynchronous - 在 F# 中混合 IObservable 和 Async<'a>

转载 作者:行者123 更新时间:2023-12-04 05:17:29 25 4
gpt4 key购买 nike

我有一个库提供的IObservable,它监听来自外部服务的事件:

let startObservable () : IObservable<'a> = failwith "Given"

对于每个接收到的事件,我想执行一个返回 Async 的操作:

let action (item: 'a) : Async<unit> = failwith "Given"

我正在尝试实现一个处理器

let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable

我已经编写了 mapAsyncAwaitObservable:理想情况下它们将由某个库提供,但我目前还没有找到。

额外要求:

  • 应按顺序执行操作,以便在处理前一个事件时缓冲后续事件。

  • 如果一个操作抛出错误,我希望我的处理器完成。否则,它永远不会完成。

  • 应遵守通过 Async.Start 传递的取消 token 。

关于我应该使用的库有什么提示吗?

最佳答案

由于您要将基于推的模型 (IObservable<>) 转换为基于拉的模型 (Async<>),因此您需要排队以缓冲来自 observable 的数据。如果队列有大小限制——说实话。应该是为了使整个管道安全,不会溢出内存 - 然后还需要缓冲区溢出策略。

  1. 一种方法是实现 MailboxProcessor<>和自定义可观察对象,它将向其发布数据。由于 MP 是原生 F# actor 实现,它能够使用队列进行有序处理以缓冲峰值。
  2. 另一种选择是使用 FSharp.Control.AsyncSeq (特别是 AsyncSeq.ofObservableBuffered 函数),它将 observable 转换为基于拉取的异步可枚举 - 在其下方从第一点开始使用邮箱处理器:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action

关于asynchronous - 在 F# 中混合 IObservable 和 Async<'a>,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52292043/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com