gpt4 book ai didi

apache-flink - 是否有在 Flink 中使用直方图累加器的示例

转载 作者:行者123 更新时间:2023-12-02 11:41:50 24 4
gpt4 key购买 nike

我偶然发现了 Flink 层次结构中的 Histogram 类,但没有“这里是如何使用它”类型的文档。我想做一些类似的事情:

dataStream
.countWindowAll(100)
.fold(new Histogram(), (histogram,data) -> {histogram.add(data.getValue()); return histogram;})
.flatmap((h, out) -> h.getLocalValue().navigableKeySet.iterator().forEachRemaining(key -> out.collect(key.toString()+","+h.get(key).toString()))
.print()

但遗憾的是,直方图无法通过 Flink 进行序列化。也许有“这里是如何使用它”或者有另一种通过 flink 绘制直方图的方法。

我显然做错了什么。

最佳答案

Flink 的累加器不适合用作 DataStreamDataSet 的数据类型。

相反,它们是通过 RuntimeContext 注册的,可从 RichFunction.getRuntimeContext() 获取。这通常在RichFunction`的open()方法中完成:



class MyFunc extends RichFlatMapFunction[Int, Int] {

val hist: Histogram = new Histogram()

override def open(conf: Configuration): Unit = {
getRuntimeContext.addAccumulator("myHist", hist)
}

override def flatMap(value: Int, out: Collector[Int]): Unit = {
hist.add(value)
}
}

累加器的所有并行实例都会定期发送到 JobManager(主进程)并进行合并。它们的值可以从 StreamExecutionEnvironment.execute() 返回的 JobExecutionResult 访问。

我认为 Flink 的累加器无法解决您的用例。您应该创建自定义直方图数据类型。

关于apache-flink - 是否有在 Flink 中使用直方图累加器的示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41388680/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com