gpt4 book ai didi

scala - Akka Streams 按类型拆分流

转载 作者:行者123 更新时间:2023-12-04 14:55:59 24 4
gpt4 key购买 nike

我有以下简单的案例类层次结构:

sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message

我有一个 Flow[Message, Message, NotUsed] (来自基于 Websocket 的协议(protocol),编解码器已经到位)。

我想解复用这个 Flow[Message] Foo 和 Baz 类型的单独流,因为它们是由完全不同的路径处理的。

最简单的方法是什么?应该很明显,但我错过了一些东西......

最佳答案

一种方法是使用创建一个包含每种消息类型的流的 RunnableGraph。

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>

val in = Source(...) // Some message source
val out = Sink.ignore

val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
val partition = builder.add(Partition[Message](2, {
case Foo(_) => 0
case Baz(_) => 1
}))

partition ~> foo ~> // other Flow[Foo] here ~> out
partition ~> baz ~> // other Flow[Baz] here ~> out

ClosedShape
}

g.run()

关于scala - Akka Streams 按类型拆分流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40433517/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com