gpt4 book ai didi

java - 为什么 Storm 不在工作集群上重播失败消息,而是在本地桌面上以集群模式重播

转载 作者:行者123 更新时间:2023-12-01 17:18:59 25 4
gpt4 key购买 nike

这是我尝试执行的代码。我故意在 bolt 上失败。这样我就可以看到失败的消息被 Storm 重播。但看起来这并没有发生。

public static class FastRandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
private static final String[] CHOICES = {
"marry had a little lamb whos fleese was white as snow",
"and every where that marry went the lamb was sure to go"
};

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = ThreadLocalRandom.current();
}

@Override
public void nextTuple() {
String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
_collector.emit(new Values(sentence), sentence);
}

@Override
public void fail(Object id) {
System.out.println("RAVI: the failedObjectId = "+id);
_collector.emit(new Values(id), id);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

这里是关于Split Sentence Bolt的详细信息。我故意失败的地方。

 public static class SplitSentence extends BaseRichBolt 
{
OutputCollector _collector;
@Override
public void prepare(Map conf,
TopologyContext context,
OutputCollector collector)
{
_collector = collector;
}

这是发生失败的函数

    @Override
public void execute(Tuple tuple)
{
String sentence = tuple.getString(0);
System.out.println("sentence = "+sentence);
if(sentence.equals("marry had a little lamb whos fleese was white as snow"))
{
System.out.println("going to fail");
_collector.fail(tuple);
}
else
{
for (String word: sentence.split("\\s+")) {
_collector.emit(tuple, new Values(word, 1));
}
_collector.ack(tuple);
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

这是驱动代码详细信息。 public static void main(String[] args) 抛出异常 {

   TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new FastRandomSentenceSpout(), 4);

builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");


Config conf = new Config();
conf.registerMetricsConsumer(
org.apache.storm.metric.LoggingMetricsConsumer.class);


String name = "wc-test";
if (args != null && args.length > 0) {
name = args[0];
}

conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(name,
conf,
builder.createTopology());

}

最佳答案

事实证明这是由于storm.yaml中提到的全局设置造成的。具体设置为

topology.acker.executors: 0

关于java - 为什么 Storm 不在工作集群上重播失败消息,而是在本地桌面上以集群模式重播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61334438/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com