作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在学习 Trident框架。 Trident上有几种方法 Stream
s用于批处理中的聚合元组,包括 this one它允许使用 Aggregator
预先形成元组的有状态映射界面。但不幸的是,一个内置的对应物可以额外持久化 map 状态,就像 persistentAggregate()
的其他 9 个重载一样。 ,只有 Aggregator
作为论据,不存在。
因此,我如何通过结合较低级别的 Trident 和 Storm 抽象和工具来实现所需的功能?探索 API 非常困难,因为几乎没有 Javadoc 文档。
换句话说,persistentAggregate()
方法允许通过更新一些持久状态来结束流处理:
stream of tuples ---> persistent state
stream of tuples ------> stream of different tuples
with
persistent state
Stream.aggregate(Fields, Aggregator, Fields)
不提供容错:
stream of tuples ------> stream of different tuples
with
simple in-memory state
最佳答案
您可以使用方法 TridentState#newValuesStream() 从状态创建新流。 .
这将允许您检索聚合值的流。
出于说明目的,我们可以改进 example in Trident documentation通过添加此方法和调试过滤器:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());
关于stream - 如何在 Trident 中映射具有持久状态的元组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19859566/
我是一名优秀的程序员,十分优秀!