gpt4 book ai didi

java - 字数统计拓扑中的确认为零

转载 作者:行者123 更新时间:2023-12-01 10:34:05 25 4
gpt4 key购买 nike

我是storm新手,我提交了storm-starter项目,字数拓扑

我得到了 wordcount

确认为零!我该如何修复它?

代码链接,供不了解该项目的人使用

https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/WordCountTopology.java

package storm.starter;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import storm.starter.spout.RandomSentenceSpout;

import java.util.HashMap;
import java.util.Map;

/**
* This topology demonstrates Storm's stream groupings and multilang capabilities.
*/
public class WordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {

public SplitSentence() {
super("python", "splitsentence.py");
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}

public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);

builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

Config conf = new Config();
conf.setDebug(true);


if (args != null && args.length > 0) {
conf.setNumWorkers(3);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();
}
}
}

最佳答案

要在 Storm 中获得元组确认,需要做一些事情。

  1. 确保通过设置 Topology_Ackers_Executors 配置来启用 acker。

    //in java
    conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 2);
    //in storm.yaml
    topology.acker.executors: 2 //defaults to 0

    您只需在这两个位置之一设置配置。 storm.yaml 是默认设置,Java 配置可以覆盖storm.yaml 中的任何内容。

  2. Tuple Anchoring在博尔特

    //short java snippet
    String sentence = tuple.getString(0);
    for(String word: sentence.split(" ")) {
    _collector.emit(tuple, new Values(word)); //anchoring
    _collector.emit(new Values(word)); //not anchoring
    }
    _collector.ack(tuple);

    如上面的链接所述。您必须将元组锚定在一起才能启用确认。在第一个 collector.emit 中,将新创建的元组 new Values(word) 锚定到旧元组。但在第二个 Collector.emit 中,您没有锚定元组。元组需要相互锚定才能正常工作。我不知道如何在 Python 中做到这一点,所以你必须弄清楚。

您可能还需要做其他事情,这个答案主要来自内存,我还没有测试过您的任何代码。但这应该给你一个起点。如果您有任何问题,请阅读documentation在问另一个低质量的问题之前。这就是我的想法,你可能也应该学习发展这项技能。

关于java - 字数统计拓扑中的确认为零,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34903796/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com