gpt4 book ai didi

scala - FS2:是否可以优雅地完成队列?

转载 作者:行者123 更新时间:2023-12-01 07:51:14 27 4
gpt4 key购买 nike

假设我想将一些遗留的异步 API 转换为 FS2 Streams。
API 提供了一个带有 3 个回调的接口(interface):下一个元素、成功、错误。
我希望 Stream 发出所有元素,然后在收到成功或错误回调时完成。

FS2 指南 ( https://functional-streams-for-scala.github.io/fs2/guide.html ) 建议使用 fs2.Queue对于这样的情况,
它非常适合入队,但到目前为止我看到的所有示例都期望 queue.dequeue 的流返回永远不会完成 -
在我的情况下,没有明显的方法来处理成功/错误回调。
我尝试使用 queue.dequeue.interruptWhen(...here goes the signal...) , 但如果成功/错误回调在客户端从流中读取数据之前到达,
流过早终止 - 仍有未读元素。我希望消费者在完成流之前完成阅读它们。

FS2可以做到吗?使用 Akka Streams 很简单 - SourceQueueWithCompletecompletefail方法。

更新:
通过在 Option 中包装元素并将 None 视为停止读取流的信号,以及使用 Promise 传播错误,我能够获得足够好的结果:

queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)

但是,我是否忽略了更自然的做这些事情的方式?

最佳答案

一种惯用的方法是创建一个 Queue[Option[A]]而不是 Queue[A] .入队时,换行 Some ,并且您可以显式地将 None 加入队列表示完成。在出队方面,执行 q.dequeue.unNoneTerminate ,这会给你一个 Stream[F, A]一旦队列发出 None 就终止

关于scala - FS2:是否可以优雅地完成队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50212871/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com