gpt4 book ai didi

scala - 为什么与空 fs2.Stream 合并会改变程序的行为

转载 作者:行者123 更新时间:2023-12-03 16:20:40 26 4
gpt4 key购买 nike

与空 fs2.Stream 合并是有据可查的。应该产生相同的 fs2.Stream .这是来自 Scaladocs 的引述:

Has the property that merge(Stream.empty, s) == s


考虑以下带有 fs2.Stream 的完整 Scala 程序:
发光元件
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})

program.flatMap(_.compile.drain).unsafeRunSync()
}
该程序打印以下内容:
Got value 0
Got value 1
Got value 2
...
看起来没问题。现在应用来自 Scaladoc 的报价上面我得出的结论是,更换
fs2.Stream.repeatEval(ref.get)
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int])
行为应该是相同的。这是更新的程序:
发射元素并与空 fs2.Stream 合并
import scala.concurrent.duration._
import cats.effect.{ContextShift, IO, Timer}
import cats.syntax.flatMap._
import cats.effect.concurrent.Ref

import scala.concurrent.ExecutionContext

object TestFs2 extends App {
implicit val timerIo: Timer[IO] = IO.timer(ExecutionContext.global)
implicit val concurrentIo: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

val program = Ref.of[IO, Int](0).map(ref => {
fs2.Stream.repeatEval(ref.get).merge(fs2.Stream.empty.covaryAll[IO, Int]).evalMap(value => {
IO(println(s"Got value $value")) >> IO.sleep(1.second) >> ref.set(value + 1)
})
})

program.flatMap(_.compile.drain).unsafeRunSync()
}
程序输出是
Got value 0
Got value 0
Got value 1
Got value 1
Got value 2
Got value 2
Got value 3
Got value 3
...
问题:为什么与空合并 fs2.Stream改变程序的行为,导致重复原始 fs2.Stream 的元素?

最佳答案

merge 的文档还说:

The implementation always tries to pull one chunk from each side before waiting for it to be consumed by resulting stream. As such, there may be up to two chunks (one from each stream) waiting to be processed while the resulting stream is processing elements.


如果我理解正确,那将意味着当结果流忙于处理值时 0 ,在 ref 之前已经从源中提取了一个新值已经更新。
严格来说,我不认为这种行为违反了任何不变量。但对你来说,这很重要,因为
  • 你的流改变了它从中拉出的来源
  • 您的源流随时准备发出元素

  • 要解决第二点,您可以使用 1 元素队列而不是 Ref .
    AFAICT 如果不使用 merge 也会出现同样的问题.只要源可以发出它们,流就可以在处理它们之前自由地从源中提取尽可能多的元素。您在第一段代码中基本上很幸运,因为您有一个非常简单的包含 1 个元素块的流。

    关于scala - 为什么与空 fs2.Stream 合并会改变程序的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63402330/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com