gpt4 book ai didi

stream - 如何在 Trident 中映射具有持久状态的元组?

转载 作者:行者123 更新时间:2023-12-04 12:41:25 26 4
gpt4 key购买 nike

我在学习 Trident框架。 Trident上有几种方法 Stream s用于批处理中的聚合元组,包括 this one它允许使用 Aggregator 预先形成元组的有状态映射界面。但不幸的是,一个内置的对应物可以额外持久化 map 状态,就像 persistentAggregate() 的其他 9 个重载一样。 ,只有 Aggregator作为论据,不存在。

因此,我如何通过结合较低级别的 Trident 和 Storm 抽象和工具来实现所需的功能?探索 API 非常困难,因为几乎没有 Javadoc 文档。

换句话说,persistentAggregate()方法允许通过更新一些持久状态来结束流处理:

stream of tuples ---> persistent state

顺便说一下,我想更新持久状态并发出不同的元组:
stream of tuples ------> stream of different tuples
with
persistent state
Stream.aggregate(Fields, Aggregator, Fields)不提供容错:
stream of tuples ------> stream of different tuples
with
simple in-memory state

最佳答案

您可以使用方法 TridentState#newValuesStream() 从状态创建新流。 .
这将允许您检索聚合值的流。

出于说明目的,我们可以改进 example in Trident documentation通过添加此方法和调试过滤器:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);

TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());

运行此拓扑将输出(到控制台)聚合计数。

希望能帮助到你

关于stream - 如何在 Trident 中映射具有持久状态的元组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19859566/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com