gpt4 book ai didi

scala - Akka Stream 连接到多个接收器

转载 作者:行者123 更新时间:2023-12-05 00:52:21 27 4
gpt4 key购买 nike

我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个导出之一将它们发送出去。您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际的流。除了对传入元素进行分区之外,它还将它们合并为一个元素,即组件内部发生了一些缓冲,因此 1 个元素输入并不一定意味着 1 个元素通过导出输出。

下面是所述组件的简化实现。

class CustomGroupBy[A,B](k: Int, f: A => Int) extends GraphStage[FlowShape[B, B]] {

val in = Inlet[A]("CustomGroupBy.in")
val outs = (0 until k).map(i => Outlet[B](s"CustomGroupBy.$i.out"))

override val shape = new AmorphousShape(scala.collection.immutable.Seq(in), outs)

/* ... */
}

我现在如何将该组件的每个导出连接到不同的 Sink 并结合所有这些 sinks 的物化值。

我已经尝试了一些使用图形 DSL 的方法,但还没有完全让它工作。有没有人会这么好心为我提供一个片段来做到这一点或为我指明正确的方向?

提前致谢!

最佳答案

您很可能需要内置 broadcast阶段。示例用法可以在 here 中找到:

val bcast = builder.add(Broadcast[Int](2))

in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge

关于scala - Akka Stream 连接到多个接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43119221/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com