gpt4 book ai didi

java - nextTuple() 在 Storm 上使用 BaseRichSpout 被无限次调用

转载 作者:塔克拉玛干 更新时间:2023-11-01 22:44:52 26 4
gpt4 key购买 nike

我实现了简单的 Storm 拓扑结构,它有一个 spout 和一个在本地集群模式下运行的 bolt。

出于某种原因,spout 的 nextTuple() 被调用了不止一次。

知道为什么吗?

代码:

喷口:

public class CommitFeedListener extends BaseRichSpout {
private SpoutOutputCollector outputCollector;
private List<String> commits;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("commit"));
}

@Override
public void open(Map configMap,
TopologyContext context,
SpoutOutputCollector outputCollector) {
this.outputCollector = outputCollector;
}

**//that method is invoked more than once**
@Override
public void nextTuple() {

outputCollector.emit(new Values("testValue"));

}
}

bolt :

public class EmailExtractor extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("email"));
}
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String commit = tuple.getStringByField("commit");
System.out.println(commit);
}
}

运行配置:

public class LocalTopologyRunner {
private static final int TEN_MINUTES = 600000;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
builder
.setBolt("email-extractor", new EmailExtractor())
.shuffleGrouping("commit-feed-listener");
Config config = new Config();
config.setDebug(true);
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("github-commit-count-topology",
config,
topology);
Utils.sleep(TEN_MINUTES);
cluster.killTopology("github-commit-count");
cluster.shutdown();
}
}

谢谢大家,射线。

最佳答案

nextTuple() 按照设计在无限循环中调用。这样做是为了使用例如针对外部资源(数据库、流、IO 等)的脏检查。

如果你在 nextTuple() 中无事可做,你应该睡一会儿以防止 CPU 使用 backtype.storm.utils.Utils 发送垃圾邮件

Utils.sleep(pollIntervalInMilliseconds);

Storm 是一种实时处理架构,因此它确实是正确的行为。检查一些示例以了解如何根据您的需要实现 spout。

关于java - nextTuple() 在 Storm 上使用 BaseRichSpout 被无限次调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27354438/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com