- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我在 Scala 中有这个通用方法
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S],
Optional[S]]) : JavaPairDStream[K, S] = { ... }
当我用 Java 调用它时,这两个都无法编译:
JavaPairDStream<String, Integer> stateDstream =
pairs.<Integer>updateStateByKey(...);
JavaPairDStream<String, Integer> stateDstream =
pairs.updateStateByKey(...);
如何正确调用该方法?
错误消息:
The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>,
int) in the type JavaPairDStream<String,Integer> is not applicable for
the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>,
HashPartitioner, JavaPairRDD<String,Integer>)
编辑:整个函数调用(Java 8):
final Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = state.or(0);
for (Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
};
JavaPairDStream<String, Integer> stateDstream = pairs.updateStateByKey(
updateFunction
,
new HashPartitioner(context.defaultParallelism()), initialRDD);
编辑:事实证明,泛型不是问题,而是参数与方法签名不匹配。
最佳答案
问题是您传入了一个 initialRDD
,而方法 updateStateByKey
没有它作为参数。
最接近的签名是:
updateStateByKey[S](updateFunc: Function2[List[V], Optional[S], Optional[S]],
partitioner: Partitioner): JavaPairDStream[K, S]
关于java - 在 Spark Streaming 中调用 updateStateByKey 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29129195/
想知道为什么 StatefulNetworkWordCount.scala 示例调用臭名昭著的 updateStateByKey() 函数,该函数应该只将函数作为参数,而不是: val stateDs
我编写了一个与 updateStateByKey 一起使用的简单函数,以查看问题是否是因为我的 updateFunc。我认为这一定是由于其他原因。我在 --master local[4] 上运行它。
我正在运行一个 24X7 的 Spark 流并使用 updateStateByKey 函数来保存计算的历史数据,就像 NetworkWordCount Example 的情况一样.. 我试图流式传输一
如何通过 INPUT PostgreSQL 表的更改触发的 Spark 结构化流计算来更新 OUTPUT TABLE 的状态? 作为现实生活中的场景,USERS 表已被user_id = 0002 更
我正在使用 updateStateByKey()在我的 Spark Streaming 应用程序中维护状态的操作。输入数据来自 Kafka 主题。 我想了解 DStreams 是如何分区的? 分区如何
我正在尝试使用 Spark Streaming 编写一个简单的应用程序,以从 Kafka 读取数据,并持续计算从主题读取单词的次数。我在调用非常重要的 updateStateByKey 方法时遇到问题
我正在尝试合并两个流,其中一个应该是有状态的(比如不经常更新的静态数据): SparkConf conf = new SparkConf().setAppName("Test Application"
我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态 Spark Streaming 计算。目标是“ session 化”类似于 this blog post 的网络流
我正在 24/7 全天候运行 Spark 流并使用 updateStateByKey是否可以 24/7 全天候运行 Spark Streaming?如果是,updateStateByKey 不会变大,
我在 Scala 中有这个通用方法 def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]
当我遇到 updateStateByKey() 函数时,我刚刚开始寻找使用 Spark Streaming 进行有状态计算的解决方案。 我试图解决的问题: 10,000 个传感器每分钟产生一个二进制值
我在 Spark Streaming 应用程序中使用 updateStateByKey 函数来持久化和更新每个键的状态。问题是我想知道 “ key ”在更新函数里面。 input.updateStat
我是一名优秀的程序员,十分优秀!