gpt4 book ai didi

java - java中的Akka流广播

转载 作者:行者123 更新时间:2023-11-30 05:37:20 33 4
gpt4 key购买 nike

我试图从java中的源广播到2个接收器,卡在中间,任何指针都会有帮助

 public static void main(String[] args) {


ActorSystem system = ActorSystem.create("GraphBasics");
ActorMaterializer materializer = ActorMaterializer.create(system);

final Source<Integer, NotUsed> source = Source.range(1, 1000);
Sink<Integer,CompletionStage<Done>> firstSink = Sink.foreach(x -> System.out.println("first sink "+x));
Sink<Integer,CompletionStage<Done>> secondsink = Sink.foreach(x -> System.out.println("second sink "+x));


RunnableGraph.fromGraph(
GraphDSL.create(
b -> {
UniformFanOutShape<Integer, Integer> bcast = b.add(Broadcast.create(2));


b.from(b.add(source)).viaFanOut(bcast).to(b.add(firstSink)).to(b.add(secondsink));


return ClosedShape.getInstance();
}))
.run(materializer);
}

最佳答案

我不太熟悉akka-stream图的java api,所以我使用了official doc 。您的代码片段中有 2 个错误:

  1. 当您将源添加到图形构建器时,您需要从中获取Outlet。因此,而不是 b.from(b.add(source)) 应该是这样的: b.from(b.add(source).out()) 根据到官方文档

  2. 您不能连续调用两个 .to 方法,因为 .to 需要具有 Sink 形状的 smth,这意味着一种死胡同。相反,您需要将第二个接收器直接附加到广播,如下所示:

    (...).viaFanOut(bcast).to(b.add(firstSink));
    b.from(bcast).to(b.add(secondSink));

总而言之,代码应该如下所示:

ActorSystem system = ActorSystem.create("GraphBasics");
ActorMaterializer materializer = ActorMaterializer.create(system);

final Source<Integer, NotUsed> source = Source.range(1, 1000);
Sink<Integer, CompletionStage<Done>> firstSink = foreach(x -> System.out.println("first sink " + x));
Sink<Integer, CompletionStage<Done>> secondSink = foreach(x -> System.out.println("second sink " + x));


RunnableGraph.fromGraph(
GraphDSL.create(b -> {
UniformFanOutShape<Integer, Integer> bcast = b.add(Broadcast.create(2));

b.from(b.add(source).out()).viaFanOut(bcast).to(b.add(firstSink));
b.from(bcast).to(b.add(secondSink));

return ClosedShape.getInstance();
}
)
).run(materializer);

最后一点 - 我会三思而后行,使用图形 API 是否有意义。如果您的情况像这样简单(只有 2 个接收器),您可能只想使用 alsoToalsoToMat 。它们使您可以将多个接收器附加到流中,而无需使用图表。

关于java - java中的Akka流广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56330281/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com