gpt4 book ai didi

scala - 如何在scala中使用flink折叠功能

转载 作者:行者123 更新时间:2023-12-02 05:52:24 27 4
gpt4 key购买 nike

这是使用 Flink Fold 与 scala 匿名函数的无效尝试:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])

它编译得很好,但在执行时,我遇到了“类型删除问题”(见下文)。在 Java 中这样做很好,但当然更冗长。我喜欢简洁明了的 lambda。我怎样才能在scala中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined.
This is most likely a type erasure problem.
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

最佳答案

您遇到的问题是Flink [1]中的一个错误。该问题源于 Flink 的 TypeExtractor 以及 Scala DataStream API 在 Java 实现之上的实现方式。 TypeExtractor 无法为 Scala 类型生成 TypeInformation,因此返回 MissingTypeInformation。此缺失的类型信息是在创建 StreamFold 运算符后手动设置的。但是,StreamFold 运算符的实现方式不接受 MissingTypeInformation,因此在设置正确的类型信息之前会失败。

我已打开拉取请求 [2] 来解决此问题。应该会在接下来的两天内合并。通过使用最新的 0.10 快照版本,您的问题应该得到解决。

关于scala - 如何在scala中使用flink折叠功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32378765/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com