gpt4 book ai didi

apache-storm - 将两个 bolt 的输出发送到 Storm 中的单个 bolt ?

转载 作者:行者123 更新时间:2023-12-04 17:48:43 25 4
gpt4 key购买 nike

将 BoltA 和 BoltB 的输出发送到 BoltC 的最简单方法是什么。我必须使用 Joins 还是有任何更简单的解决方案。 A 和 B 具有相同的字段(ts、metric_name、metric_count)。

    // KafkaSpout --> LogDecoder
builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);

// LogDecoder --> CountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);

// LogDecoder --> HttpResCodeCountBolt
builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);


# And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.

// CountBolt --> AggregatwBolt
builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));

// HttpResCodeCountBolt --> AggregatwBolt
builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));

这可能吗 ?

最佳答案

是的。只需在 fieldsGrouping 调用中添加一个流 ID(下面的“stream1”和“stream2”):

BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5); 
bd.fieldsGrouping((COUNT_BOLT_ID), "stream1", new Fields("ts"));
bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));

然后在 BoltC 的 execute() 方法中,您可以测试以查看元组来自哪个流:
public void execute(Tuple tuple) {

if ("stream1".equals(tuple.getSourceStreamId())) {
// this came from stream1
} else if ("stream2".equals(tuple.getSourceStreamId())) {
// this came from stream2
}

由于您知道元组来自哪个流,因此您不需要在两个流上具有相同形状的元组。您只需根据流 ID 对元组进行解码。

您还可以检查元组来自哪个组件(因为我输入这个,我认为这可能更适合您的情况)以及发出元组的组件(任务)的实例。

关于apache-storm - 将两个 bolt 的输出发送到 Storm 中的单个 bolt ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23943560/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com