gpt4 book ai didi

scala - 如何在 fs2 Functional Stream for Scala 中创建离散流?

转载 作者:行者123 更新时间:2023-12-01 07:29:01 25 4
gpt4 key购买 nike

是否可以在 fs2 中创建离散事件流?如果是的话该怎么做。我刚开始玩图书馆,我知道我有很多东西要学习。但我没有看到任何相关的例子。例如我想在 scalajsswing 中为“mousemove”或“click”创建流。我正在寻找类似 RxJS 的东西,我可以使用 Rx.Observable.create 来创建离散事件,例如:

//note: pseudo code
var mouse = Rx.Observable.create( subscriber => {
document.body.addEventListener("mousemove", event =>{
subscriber.onNext(event)
})
} )

fs2 中的等价物可能不是那么微不足道,但如果有人可以建议我怎么做。我猜它会使用 HandlerPull/Push 数据类型,但我还不太了解如何使用。

干杯。

最佳答案

这是我想出的一个示例,演示了如何将 fs2 与 JavaFX 一起使用:

import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import javafx.application.{Application, Platform}
import javafx.scene.{Node, Scene}
import javafx.scene.layout._
import javafx.stage.Stage
import fs2._
import fs2.concurrent._
import javafx.beans.value.WritableValue
import javafx.scene.control.{Label, TextField}
import javafx.scene.input.KeyEvent

import scala.concurrent.ExecutionContext

import scala.util.Try

class Fs2Ui extends Application {
override def start(primaryStage: Stage): Unit = {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

new Logic[IO]().run(primaryStage).start.unsafeRunSync()
}

class Logic[F[_]: ConcurrentEffect: ContextShift: Timer] {
import Fs2Ui._
import java.time.{Duration, Instant}
import java.util.concurrent.TimeUnit.MILLISECONDS

def run(primaryStage: Stage): F[Unit] = for {
v <- initializeUi(primaryStage)
View(input, feedback) = v

_ <- Stream(input).covary[F]
.through(typedChars)
.through(processInput)
.through(displayFeedback(feedback.textProperty))
.compile.drain
} yield ()

private def initializeUi(primaryStage: Stage): F[View] = updateUi {
val input = new TextField()
input.setPrefWidth(300)
val feedback = new Label("...")

val vbox = new VBox(input, feedback)
val root = new StackPane(vbox)
val scene = new Scene(root)

primaryStage.setScene(scene)
primaryStage.show()

View(input, feedback)
}

private def processInput: Pipe[F, TypedChar, Feedback] = for {
typed <- _
_ <- Stream.eval(ContextShift[F].shift)
res <- Stream.eval { time(processSingle(typed)) }
(d, Feedback(str)) = res
} yield Feedback(s"$str in [$d]")

private def displayFeedback(value: WritableValue[String]): Pipe[F, Feedback, Unit] =
_.map { case Feedback(str) => str } through updateValue(value)

private def time[A](f: F[A]): F[(Duration, A)] = {
val now = Timer[F].clock.monotonic(MILLISECONDS).map(Instant.ofEpochMilli)
for {
start <- now
a <- f
stop <- now
d = Duration.between(start, stop)
} yield (d, a)
}

private val processSingle: TypedChar => F[Feedback] = {
import scala.util.Random
import scala.concurrent.duration._

val prng = new Random()
def randomDelay: F[Unit] = Timer[F].sleep { (250 + prng.nextInt(750)).millis }

c => randomDelay *> Sync[F].delay(Feedback(s"processed $c"))
}
}
}

object Fs2Ui {
case class View(input: TextField, feedback: Label)

case class TypedChar(value: String)
case class Feedback(value: String)

private def typedChars[F[_]: ConcurrentEffect]: Pipe[F, Node, TypedChar] = for {
node <- _
q <- Stream.eval(Queue.unbounded[F, KeyEvent])
_ <- Stream.eval(Sync[F].delay {
node.setOnKeyTyped { evt => (q enqueue1 evt).toIO.unsafeRunSync() }
})
keyEvent <- q.dequeue
} yield TypedChar(keyEvent.getCharacter)

private def updateValue[F[_]: Async, A](value: WritableValue[A]): Pipe[F, A, Unit] = for {
a <- _
_ <- Stream.eval(updateUi(value setValue a))
} yield ()

private def updateUi[F[_]: Async, A](action: => A): F[A] =
Async[F].async[A] { cb =>
Platform.runLater { () =>
cb(Try(action).toEither)
}
}
}

演示 fs2 和 JavaFX 之间绑定(bind)的特定部分是两个 Pipe:typedCharsupdateValue。我个人认为,最具挑战性的部分是调整 KeyEvent 监听器,使其看起来像事件的 fs2 Stream:

node.setOnKeyTyped { evt => (q enqueue1 evt).toIO.unsafeRunSync() }

关于scala - 如何在 fs2 Functional Stream for Scala 中创建离散流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45512201/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com