gpt4 book ai didi

java - 在简单的聚合 Storm 拓扑中分组

转载 作者:搜寻专家 更新时间:2023-10-30 21:33:29 24 4
gpt4 key购买 nike

我正在尝试编写一个执行以下操作的拓扑:

  1. 订阅 Twitter 提要的 spout(基于关键字)
  2. 一个聚合 bolt,它聚合了一个集合中的许多推文(比如 N 条)并将它们发送到打印机 bolt
  3. 一个简单的 bolt ,可立即将集合打印到控制台。

实际上我想对集合做更多的处理。

我在本地对其进行了测试,看起来它可以正常工作。但是,我不确定我是否正确地在 bolt 上设置了分组,以及当部署在实际的 Storm 集群上时这是否能正常工作。如果有人可以帮助查看此拓扑并提出任何错误、更改或改进建议,我将不胜感激。

谢谢。

这就是我的拓扑结构。

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
.shuffleGrouping("spout");
builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

聚合 bolt

public class SampleAggregatorBolt implements IRichBolt {

protected OutputCollector collector;
protected Tuple currentTuple;
protected Logger log;
/**
* Holds the messages in the bolt till you are ready to send them out
*/
protected List<Status> statusCache;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;

log = Logger.getLogger(getClass().getName());
statusCache = new ArrayList<Status>();
}

@Override
public void execute(Tuple tuple) {
currentTuple = tuple;

Status currentStatus = null;
try {
currentStatus = (Status) tuple.getValue(0);
} catch (ClassCastException e) {
}
if (currentStatus != null) {

//add it to the status cache
statusCache.add(currentStatus);
collector.ack(tuple);


//check the size of the status cache and pass it to the next stage if you have enough messages to emit
if (statusCache.size() > 10) {
collector.emit(new Values(statusCache));
}

}
}

@Override
public void cleanup() {


}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tweets"));

}

@Override
public Map<String, Object> getComponentConfiguration() {
return null; //To change body of implemented methods use File | Settings | File Templates.
}


protected void setupNonSerializableAttributes() {

}

}

打印机 bolt

public class PrinterBolt extends BaseBasicBolt {

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple.size() + " " + tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}

}

最佳答案

据我所知,它看起来不错。不过,细节决定成败。我不确定您的聚合器 bolt 做了什么,但如果它对传递给它的值做出任何假设,那么您应该考虑适当的字段分组。这可能不会产生太大的差异,因为您使用的是默认并行度提示 1,但是如果您决定使用多个聚合 Bolt 实例进行扩展,您所做的隐式逻辑假设可能需要非混洗分组。

关于java - 在简单的聚合 Storm 拓扑中分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16924824/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com