gpt4 book ai didi

java - 如何将两个不同 Spout 的输出发送到同一个 Bolt?

转载 作者:搜寻专家 更新时间:2023-11-01 01:20:57 26 4
gpt4 key购买 nike

我有两个 Kafka Spouts,我想将它们的值发送到同一个 bolt。

这可能吗?

最佳答案

是的,这是可能的:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组。

更新:

为了区分consumer bolt中的元组(即topic_1或topic_2),有两种可能:

1) 您可以使用运算符(operator) ID(如@user-4870385 所建议):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
//do something
} else {
//do something
}

2) 您可以使用流名称(如@zenbeni 所建议)。对于这种情况,两个 spout 都需要声明命名流,并且 bolt 需要通过流名称连接到 spout:

public class MyKafkaSpout extends KafkaSpout {
final String streamName;

public MyKafkaSpout(String stream) {
this.streamName = stream;
}

// other stuff omitted

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// compare KafkaSpout.declareOutputFields(...)
declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
}
}

构建拓扑,现在需要使用流名称:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt 中,流名称现在可用于区分输入元组:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
// do something
} else {
// do something
}

讨论:

虽然使用流名称的第二种方法更自然(根据@zenbeni),第一种更灵活(IHMO)。流名称由 spout/bolt 直接声明(即,在编写 spout/bolt 代码时);相反,运算符 ID 是在拓扑组合在一起时分配的(即,在 使用 spout/bolt 时)。

假设我们得到三个 bolt 作为类文件(无源代码)。前两个应该用作生产者,并且都声明具有相同名称的输出流。如果第三个消费者按流区分输入元组,这将不起作用。即使两个给定的生产者 bolt 都声明了不同的输出流名称,预期的输入流名称也可能在消费者 bolt 中硬编码并且可能不匹配。因此,它也不起作用。但是,如果消费者 bolt 使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件 ID。

当然,可以从给定的类继承(如果未声明 final 并覆盖 declareOutputFields(...) 以分配自己的流名称. 然而,这是更多额外的工作要做。

关于java - 如何将两个不同 Spout 的输出发送到同一个 Bolt?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31851311/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com