gpt4 book ai didi

scala - 如何从广播的 Akka 流中获取订阅者和发布者?

转载 作者:行者123 更新时间:2023-12-02 07:26:12 25 4
gpt4 key购买 nike

在使用更复杂的图表时,我无法将发布者和订阅者从我的流程中移除。我的目标是提供发布者和订阅者的 API,并在内部运行 Akka 流。这是我的第一次尝试,效果很好。

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)

val flow = subscriberSource.to(someFunctionSink)

//create Reactive Streams Subscriber
val subscriber: Subscriber[Boolean] = flow.run()

//prints true
Source.single(true).to(Sink(subscriber)).run()

但是对于更复杂的广播图,我不确定如何获取订阅者和发布者对象?我需要部分图吗?

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

FlowGraph.closed() { implicit builder =>
import FlowGraph.Implicits._

val broadcast = builder.add(Broadcast[Boolean](2))

subscriberSource ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> publisherSink
}.run()

val subscriber: Subscriber[Boolean] = ???
val publisher: Publisher[Boolean] = ???

最佳答案

当您调用 RunnableGraph.run() 时,流会运行,结果是该运行的“物化值”。

在您的简单示例中,Source.subscriber[Boolean] 的具体化值为 Subscriber[Boolean]。在您的复杂示例中,您希望将图形的多个组件的具体化值组合成一个元组 (Subscriber[Boolean], Publisher[Boolean]) 的具体化值。

您可以通过将您对其物化值感兴趣的组件传递给 FlowGraph.closed() 来实现,然后指定一个函数来组合物化值:

import akka.stream.scaladsl._
import org.reactivestreams._

val subscriberSource = Source.subscriber[Boolean]
val someFunctionSink = Sink.foreach(Console.println)
val publisherSink = Sink.publisher[Boolean]

val graph =
FlowGraph.closed(subscriberSource, publisherSink)(Keep.both) { implicit builder ⇒
(in, out) ⇒
import FlowGraph.Implicits._

val broadcast = builder.add(Broadcast[Boolean](2))

in ~> broadcast.in
broadcast.out(0) ~> someFunctionSink
broadcast.out(1) ~> out
}
val (subscriber: Subscriber[Boolean], publisher: Publisher[Boolean]) = graph.run()

Scaladocs for more information about the overloads of FlowGraph.closed .

(Keep.both 是函数的缩写 (a, b) => (a, b))

关于scala - 如何从广播的 Akka 流中获取订阅者和发布者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31195254/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com