gpt4 book ai didi

java - Apache Storm : Ack not working

转载 作者:行者123 更新时间:2023-11-29 05:01:09 25 4
gpt4 key购买 nike

我正在尝试实现有保证的消息处理,但没有调用 Spout 上的 ack 或 fail 方法。

我正在使用 spout 传递消息 ID 对象。我将元组与每个 bolt 一起传递,并在每个 bolt 中调用 collector.ack(tuple) 。

问题没有调用 ack 或 fail,我不知道为什么?

这是一个简短的代码示例。

使用 BaseRichSpout 的 Spout 代码

public void nextTuple() {
for( String usage : usageData ) {
.... further code ....

String msgID = UUID.randomUUID().toString()
+ System.currentTimeMillis();

Values value = new Values(splitUsage[0], splitUsage[1],
splitUsage[2], msgID);
outputCollector.emit(value, msgID);
}
}

@Override
public void ack(Object msgId) {
this.pendingTuples.remove(msgId);
LOG.info("Ack " + msgId);
}

@Override
public void fail(Object msgId) {
// Re-emit the tuple
LOG.info("Fail " + msgId);
this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
}

使用 BaseRichBolt 的 bolt 代码

@Override
public void execute(Tuple inputTuple) {

this.outputCollector.emit(inputTuple, new Values(serverData, msgId));

this.outputCollector.ack(inputTuple);
}

最终 bolt

@Override
public void execute(Tuple inputTuple) {
..... Simply reports does not emit .....
this.outputCollector.ack(inputTuple);

最佳答案

ack 不起作用的原因是在 spout 中使用了 for 循环。将其更改为发射下方的计数器循环版本并且它有效。

示例

        index++;
if (index >= dataset.size()) {
index = 0;
}

进一步感谢邮件列表信息。这是因为 Spout 在单个线程上运行并且会阻塞在 for 循环中,因为下一个元组不会返回,因此它永远无法调用 ACK 方法。

关于java - Apache Storm : Ack not working,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32060081/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com