gpt4 book ai didi

java - 将数据从一个 Bolt 发送到另一个 Apache Storm

转载 作者:太空宇宙 更新时间:2023-11-04 09:12:03 28 4
gpt4 key购买 nike

您好,与 Apache Storm 合作。我有多个 kafka 主题,我想使用单个 Bolt 解析所有消息(使用并行性来处理负载)。

我想问一下可以吗?以下是我正在尝试的内容

Collection<SpoutSpec<? extends BaseRichBolt>> spouts; // I take this as a method argument

TopologyBuilder topology = new TopologyBuilder();

spouts.forEach(spec -> {
topology.setSpout(spec.getName() + "Spout", new KafkaSpout(spec.getSpoutConfig()), spec.getParallelism());
topology.setBolt("FileBeat-Bolt", new FileBeatMessageBolt(), spec.getParallelism()).shuffleGrouping(spec.getName() + "Spout");
topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt");
topology.setBolt("Output-Kafka-Bolt", new ProcessedOutputHandler(), spec.getParallelism()).shuffleGrouping("Message-Handling-Bolt");
});

我的 SpoutSpec 类

public class SpoutSpec<T extends BaseRichBolt> {

private final String name;

private final int parallelism;

private final SpoutConfig spoutConfig;

private final T handler;

}

但是消息不会从 FileBeat-Bolt 发送到其他 Bolt。以下是我发出数据的方式:

JsonNode jsonNode = objectMapper.readValue(input.getString(0), JsonNode.class);

String topic = jsonNode.get("@metadata").get("topic").getTextValue();

String message = jsonNode.get("message").getTextValue();

collector.emit("Message-Handling-Bolt", input, new Values(topic, message));

最佳答案

您的 emit 调用是错误的。第一个参数不是 bolt 名称,而是流名称。当您想要将消息从一个 Bolt 划分为多个数据流时,可以使用流名称。就您而言,您不想拆分流。

collector.emit("Message-Handling-Bolt", input, new Values(topic, message));

将发送到一个名为“Message-Handling-Bolt”的流,并且您在该流上没有任何监听。您的“Message-Handling-Bolt”正在监听默认流。将第一个参数删除为 emit,或者将您的 Bolt 声明更改为:

topology.setBolt("Message-Handling-Bolt", new MessageHandlingBolt(), spec.getParallelism()).shuffleGrouping("FileBeat-Bolt", "Message-Handling-Bolt");

编辑:回复您的评论:对您来说最简单的解决方案是简单地删除发出调用中的第一个参数:

collector.emit(input, new Values(topic, message));

如果由于某种原因您不想这样做,并且想要显式命名流,则需要声明您的 FileBeatMessageBolt 将发送到 Message-Handling-Bolt 流。您可以将其作为 declareOutputFields 实现的一部分来执行:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("Message-Handling-Bolt", new Fields(...));
}

关于java - 将数据从一个 Bolt 发送到另一个 Apache Storm,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59559676/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com