gpt4 book ai didi

scala - Akka Streams 中的 Monadic 短路

转载 作者:行者123 更新时间:2023-12-05 05:20:13 25 4
gpt4 key购买 nike

我想链接一系列 Flow 的形式 a -> Try[b],其中每个连续的阶段处理 Success 前面的情况,最后的 Sink 一般处理所有 Failure

这个或类似的东西可以简洁地编码吗?它实际上是一个线性流,但我不确定每个阶段的广播和合并有多短。

最佳答案

解决这个问题的一种方法是定义一个扇出阶段,根据其结果将 Try 分成 2 个流

  object PartitionTry {
def apply[T]() = GraphDSL.create[FanOutShape2[Try[T], Throwable, T]]() { implicit builder ⇒
import GraphDSL.Implicits._

val success = builder.add(Flow[Try[T]].collect { case Success(a) ⇒ a })
val failure = builder.add(Flow[Try[T]].collect { case Failure(t) ⇒ t })
val partition = builder.add(Partition[Try[T]](2, _.fold(_ ⇒ 0, _ ⇒ 1)))

partition ~> failure
partition ~> success

new FanOutShape2[Try[T], Throwable, T](partition.in, failure.out, success.out)
}
}

然后您的通用流可以摄取 Try 并将 Failure 发送到选择的接收器,同时将 Success 传递给

  object ErrorHandlingFlow {
def apply[T, MatErr](errorSink: Sink[Throwable, MatErr]): Flow[Try[T], T, MatErr] = Flow.fromGraph(
GraphDSL.create(errorSink) { implicit builder ⇒ sink ⇒
import GraphDSL.Implicits._

val partition = builder.add(PartitionTry[T]())

partition.out0 ~> sink

new FlowShape[Try[T], T](partition.in, partition.out1)
}
)
}

使用示例如下

  val source      : Source[String, NotUsed]           = Source(List("1", "2", "hello"))
val convert : Flow[String, Try[Int], NotUsed] = Flow.fromFunction((s: String) ⇒ Try{s.toInt})
val errorsSink : Sink[Throwable, Future[Done]] = Sink.foreach[Throwable](println)
val handleErrors: Flow[Try[Int], Int, Future[Done]] = ErrorHandlingFlow(errorsSink)

source.via(convert).via(handleErrors).runForeach(println)

注意

  • 上面定义的 2 个阶段可重复用于任何类型(一次编写,随处使用)
  • 此方法可以重用于其他类型类 - 如 Either 等。

关于scala - Akka Streams 中的 Monadic 短路,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44761365/

25 4 0