gpt4 book ai didi

java - Storm Spout 没有得到 Ack

转载 作者:塔克拉玛干 更新时间:2023-11-03 02:52:02 26 4
gpt4 key购买 nike

我已经开始使用 storm,所以我使用 this tutorial 创建了简单的拓扑

当我使用 LocalCluster 运行我的拓扑时,一切看起来都很好,我的问题是我没有在元组上收到 ACK,这意味着我的 spout ack 从未被调用。

我的代码在下面 - 你知道为什么 ack 没有被调用吗?

所以我的拓扑结构是这样的

public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);

HelloWorldBolt bolt = new HelloWorldBolt();

builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

我的 Spout 是这样的

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}

public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}

private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000);

//emit only 1 tuple - for testing
if (!flag){
this.collector.emit(new Values(6));
flag = true;
}
}

@Override
public void ack(Object msgId) {
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}

public void fail(Object msgId){
System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
}
}

我的 bolt 看起来像这样

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
private OutputCollector collector;

public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info("preparing HelloWorldBolt");
}

public void execute(Tuple tuple) {
System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
this.collector.ack(tuple);
}

public void cleanup() {
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub

}
}

最佳答案

spout 中的 emit() 方法只有一个参数,因此该元组未被锚定。这就是为什么即使您在 bolt 中确认元组,也不会在 spout 中收到对 ack() 方法的回调。

要使其正常工作,您需要修改 spout 以发出第二个参数,即消息 ID。正是这个 id 传回了 spout 中的 ack() 方法:

public void nextTuple() {
Utils.sleep(5000);

//emit only 1 tuple - for testing
if (!flag){
Object msgId = "ID 6"; // this can be any object
this.collector.emit(new Values(6), msgId);
flag = true;
}
}


@Override
public void ack(Object msgId) {
// msgId should be "ID 6"
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}

关于java - Storm Spout 没有得到 Ack,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21260291/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com