gpt4 book ai didi

java - Storm 不尊重最大喷口支出

转载 作者:行者123 更新时间:2023-11-29 05:01:30 25 4
gpt4 key购买 nike

我已经创建了一个示例拓扑来测试设置 max spout spending 属性。这是一个带有 1 个 spout 和一个 bolt 的简单拓扑。 spout 发出 100000 个元组,bolt 在 hibernate 一秒钟后确认。我已将 max spout spending 属性设置为 10。我假设这意味着如果该 spout 的非确认消息计数为 10,则该 spout 将不会发出任何元组。但是当我运行拓扑时,我可以看到 spout 发出 2160 条消息然后等待。我的理解是正确的还是我遗漏了什么。我正在使用 Storm 0.9.5。下面是代码

public static void main(String[] args) {

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new TestSpout(), 1);
builder.setBolt("bolt", new TestBolt(),1).shuffleGrouping("spout");
Config conf = new Config();
conf.setNumWorkers(1);
conf.setMaxSpoutPending(10);
try {
StormSubmitter.submitTopology("test", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}


public class TestSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int count = 1;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("spoutData"));
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
System.out.println(context.maxTopologyMessageTimeout());
}

@Override
public void nextTuple() {

if(count <= 100000) {
System.out.println("Emitting : " + count);
collector.emit(new Values(count++ + ""));
}
}

public class TestBolt extends BaseRichBolt {
private OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
try {
System.out.println(input.getString(0));
Thread.sleep(1000);
collector.ack(input);
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("Exception");
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

最佳答案

您需要将消息 ID 分配给您在 Spout.nextTuple() 方法中发出的元组。否则,参数 max.spout.pending 将被忽略。例如,您可以使用 count 变量作为 ID(基本上,任何东西都可以用作 ID。它只能是唯一的。)

@Override
public void nextTuple() {
if(count <= 100000) {
System.out.println("Emitting : " + count);
collector.emit(new Values(count++ + ""), count);
}
}

否则,Storm 无法将输出元组链接到在您的 bolt 中确认的元组,即,Storm 无法计算有多少元组待处理。 Storm 只能跟踪具有 ID 的元组。

关于java - Storm 不尊重最大喷口支出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31966257/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com