gpt4 book ai didi

java - 带保证消息处理的 WordCount

转载 作者:行者123 更新时间:2023-11-30 11:19:42 27 4
gpt4 key购买 nike

我正在尝试运行保证消息处理的 WordCount 示例。

只有一个喷口

  1. WSpout - 发出带有 msgID 的随机句子。

和两个 bolt

  1. SplitSentence - 在单词中拆分句子并使用锚定发出

  2. WordCount - 打印字数。

我想用下面的代码实现的是,当所有单词都算作一个句子时。必须承认与该句子对应的 Spout。

我最后确认了 _collector.ack(tuple) 仅 WordCount。我看到奇怪的是尽管 ack()WordCount.execute() 被调用,相应的 WSpout.ack() 没有得到叫。它总是在默认超时后失败。

我真的不明白代码有什么问题。请帮我理解这个问题。任何帮助表示赞赏。

下面是完整的代码。

public class TestTopology {

public static class WSpout implements IRichSpout {
SpoutOutputCollector _collector;
Integer msgID = 0;
@Override
public void nextTuple() {
Random _rand = new Random();
String[] sentences = new String[] { "There two things benefit",
" from Storms reliability capabilities",
"Specifying a link in the",
" tuple tree is " + "called anchoring",
" Anchoring is done at ",
"the same time you emit a " + "new tuple" };

String message = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(message), msgID);
System.out.println(msgID + " " + message);

msgID++;
}
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
System.out.println("open");
_collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("LINE"));
}
@Override
public void ack(Object msgID) {
System.out.println("ack ------------------- " + msgID);

}
@Override
public void fail(Object msgID) {
System.out.println("fail ----------------- " + msgID);

}
@Override
public void activate() {
// TODO Auto-generated method stub
}
@Override
public void close() {

}
@Override
public void deactivate() {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}

public static class SplitSentence extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
System.out.println(word);
_collector.emit(tuple, new Values(word));
}
//_collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println("WordCount MSGID : " + tuple.getMessageId());
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
System.out.println(word + " ===> " + count);
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new WSpout(), 2);
builder.setBolt("split", new SplitSentence(), 2).shuffleGrouping(
"spout");
builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split",
new Fields("word"));
Config conf = new Config();
conf.setDebug(true);

if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf,
builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}

最佳答案

WordCount 扩展了 BaseBasicBolt,它确保元组在那个 bolt 中自动确认,就像您在评论中所说的那样。但是,SplitSentence 扩展了 BaseRichBolt,这需要您手动确认元组。你没有确认,所以元组超时。

关于java - 带保证消息处理的 WordCount,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23114487/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com