gpt4 book ai didi

apache-storm - 在 Storm 中使用带有三叉戟的刻度元组

转载 作者:行者123 更新时间:2023-12-01 06:28:37 25 4
gpt4 key购买 nike

我能够使用标准的 spout、bolt 组合来进行流聚合
并且在愉快的情况下工作得很好,当使用刻度元组以某个时间间隔保留数据时
使用批处理。现在我正在做一些故障管理(跟踪未保存的元组等)。(即不是来自 Storm 的 ootb)

但是我已经读过三叉戟为您提供了更高的抽象和更好的故障管理。
我不明白的是三叉戟中是否有刻度元组支持。基本上
我想在当前一分钟左右的内存中批处理并保留所有聚合数据
前几分钟使用三叉戟。

此处的任何指示或设计建议都会有所帮助。

谢谢

最佳答案

实际上,微批处理是 Trident 的一个内置功能。您不需要任何刻度元组。当你的代码中有这样的东西时:

topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)

(我在这里使用我的自定义 ElasticSearch 状态/更新程序,您可能会使用其他东西)

所以当你有这样的事情时,在引擎盖下,Trident 将你的流分组并执行 partitionPersist 操作,而不是在单个元组上,而是在这些批次上。

如果您出于任何原因仍然需要刻度元组,只需创建您的刻度喷口,这样的东西对我有用:
public class TickSpout implements IBatchSpout {

public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;

public TickSpout(long delay) {
this.delay = delay;
}

@Override
public void open(Map conf, TopologyContext context) {
}

@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}

@Override
public void ack(long batchId) {
}

@Override
public void close() {
}

@Override
public Map getComponentConfiguration() {
return null;
}

@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}

关于apache-storm - 在 Storm 中使用带有三叉戟的刻度元组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25340676/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com