gpt4 book ai didi

scala - fs2.Stream[IO, Something] 不返回 take(1)

转载 作者:行者123 更新时间:2023-12-05 07:16:17 25 4
gpt4 key购买 nike

我有一个返回 fs2.Stream of Measurements 的函数。

import cats.effect._
import fs2._

def apply(sds: SerialPort, interval: Int)(implicit cs: ContextShift[IO]): Stream[IO, SdsMeasurement] =
for {
blocker <- Stream.resource(Blocker[IO])
stream <- io.readInputStream(IO(sds.getInputStream), 1, blocker)
.through(SdsStateMachine.collectMeasurements())
} yield stream

通常它是一个无限流,除非我给它传递一个测试标志,在这种情况下它应该输出一个值并停止。

val infiniteSource: Stream[IO, SdsMeasurement] = ...
val source = if (isTest) infiniteSource.take(1) else infiniteSource
source.compile.drain

无限流工作正常。它无限地给我所有的测量值。测试流确实只给我第一次测量,仅此而已。我遇到的问题是 Stream 在最后一次测量后没有返回。它永远阻塞。我做错了什么?

注意:我想我抽象了基本代码,但要了解更多上下文,请查看我的项目:https://github.com/jkransen/fijnstof/blob/ZIO/src/main/scala/nl/kransen/fijnstof/Main.scala

最佳答案

您在此处提供的代码看起来不错,我认为问题不在于该代码。如果它阻塞,则可能是底层 API 阻塞之一,例如它可能是 InputStreamclose 方法。在这种情况下,我通常做的是在每个可能阻塞的函数调用前后添加日志语句。

关于scala - fs2.Stream[IO, Something] 不返回 take(1),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59309321/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com