gpt4 book ai didi

java - 如何在代码中使用新的 Bolt 更新现有的 Storm 拓扑?

转载 作者:行者123 更新时间:2023-12-02 10:54:29 33 4
gpt4 key购买 nike

我正在编写一个 dockerized Java Spring 应用程序,该应用程序使用 Apache Storm v1.1.2、Kafka v0.11.0.1、Zookeeper 3.4.6、Eureka 和 Cloud-Config,所有这些都在由 Docker-Compose 编排的 Docker 容器中。

我通过 KafkaSpout 接收到的元组有一个“value”字段,它是一个 protobuf 对象。我使用自定义反序列化器将我的对象从中取出进行处理。

我有一个基本的应用程序,其中有一个 Bolt 可以打印传入消息,并根据 protobuf 对象中字段的值将它们路由到其他某些 Bolt。我还有 LocalCluster、Config 和 TopologyBuilder 作为 Spring Beans。

目前,我在 PostContruct 中设置了所有 bolt ,但我需要能够动态添加 bolt ,以根据 protobuf 对象的其他字段过滤传入消息并执行基本聚合函数(最大/最小/窗口平均值)。

我想使用 REST Controller 来执行此操作,但是如何在不丢失数据的情况下停止和启动拓扑?我也不想通过从头开始监听 Kafka 主题来重新启动拓扑,因为该系统将收到极高的负载。

这篇文章看起来很有希望,但我绝对希望整个过程自动化,所以我不会进入 Zookeeper https://community.hortonworks.com/articles/550/unofficial-storm-and-kafka-best-practices-guide.html

如何在代码中编辑现有拓扑以动态添加新 bolt ?

最佳答案

你不能。 Storm 拓扑一旦提交就是静态的。如果您需要根据元组中的字段改变处理,最好的选择是预先提交您需要的所有 bolt 。然后,您可以使用一个或多个检查元组的 Bolt 来改变元组在拓扑中所采用的路径,并根据元组内容发送到特定流。

例如制作一个 SplitterBolt

public void execute(Tuple input) {
if (tuple.getIntegerByField("theDecider") == 1) {
collector.emit("onlyOnes", tuple.getValues());
} else {
collector.emit("others", tuple.getValues());
}
}

您在拓扑构建代码中的位置会有类似的内容

builder.setSpout("kafka-spout", ...);
builder.setBolt("splitter", new SplitterBolt()).shuffleGrouping("kafka-spout");
builder.setBolt("countOnes", new CounterBolt()).shuffleGrouping("splitter", "onlyOnes");
builder.setBolt("countOthers", new CounterBolt()).shuffleGrouping("splitter", "others");

关于java - 如何在代码中使用新的 Bolt 更新现有的 Storm 拓扑?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51874584/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com