gpt4 book ai didi

scala - 如何检查点数据帧?

转载 作者:行者123 更新时间:2023-12-03 10:25:45 26 4
gpt4 key购买 nike

我正在寻找一种检查点数据帧的方法。 Checkpoint 目前是 RDD 上的一项操作,但我找不到如何使用 DataFrames 执行此操作。 persist 和 cache(它们是彼此的同义词)可用于 DataFrame,但它们不会“破坏血统”,因此不适合可能循环数百(或数千)次迭代的方法。

例如,假设我有一个签名为 DataFrame => DataFrame 的函数列表。即使 myfunctions 有数百或数千个条目,我也希望有一种方法来计算以下内容:

def foo(dataset: DataFrame, g: DataFrame => Unit) =
myfunctions.foldLeft(dataset) {
case (df, f) =>
val nextDF = f(df)
g(nextDF)
nextDF
}

最佳答案

TL;DR: 对于高达 1.6 的 Spark 版本,要真正获得“检查点 DF”,我建议的解决方案是基于另一个答案,但多了一行:

df.rdd.checkpoint
df.rdd.count
val df2 = sqlContext.createDataFrame(df.rdd, df.schema)
// df2 is checkpointed

解释

进一步研究后更新。

正如所指出的,虽然有 issue,但目前 (Spark 1.6.1) 无法直接检查 DataFrame。在 Spark 的 Jira 上。

因此,一种可能的解决方法是另一个答案中建议的解决方法:
df.rdd.checkpoint // Assuming the checkpoint dir has already been set
df.count // An action to compute the checkpoint

但是,使用这种方法,只有 df.rdd 对象将被检查点。这可以通过调用 toDebugString 来验证。至 df.rdd :
 scala> df.rdd.toDebugString
(32) MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []

然后调用 toDebugString在快速转换为 df 之后(请注意,我从 JDBC 源创建了我的 DataFrame),返回以下内容:
scala> df.withColumn("new_column", lit(0)).rdd.toDebugString
res4: String =
(32) MapPartitionsRDD[5] at rdd at <console>:38 []
| MapPartitionsRDD[4] at rdd at <console>:38 []
| JDBCRDD[3] at rdd at <console>:38 []
df.explain还显示了一个提示:
scala> df.explain
== Physical Plan ==
Scan JDBCRelation (...)

所以,要真正实现一个“checkpointed”DataFrame,我只能想到从 checkpointed RDD 创建一个新的:
val newDF = sqlContext.createDataFrame(df.rdd, df.schema)
// or
val newDF = df.rdd.map {
case Row(val1: Int, ..., valN: Int) => (val1, ..., valN)
}.toDF("col1", ..., "colN")

然后我们可以验证新的 DataFrame 是“检查点”:

1) newDF.explain :
scala> newDF.explain
== Physical Plan ==
Scan PhysicalRDD[col1#5, col2#6, col3#7]

2) newDF.rdd.toDebugString :
scala> newDF.rdd.toDebugString
res7: String =
(32) MapPartitionsRDD[10] at rdd at <console>:40 []
| MapPartitionsRDD[8] at createDataFrame at <console>:37 []
| MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []

3) 转换:
scala> newDF.withColumn("new_column", lit(0)).rdd.toDebugString
res9: String =
(32) MapPartitionsRDD[12] at rdd at <console>:40 []
| MapPartitionsRDD[11] at rdd at <console>:40 []
| MapPartitionsRDD[8] at createDataFrame at <console>:37 []
| MapPartitionsRDD[1] at rdd at <console>:38 []
| ReliableCheckpointRDD[2] at count at <console>:38 []

此外,我尝试了一些更复杂的转换,并且在实践中我能够检查 newDF对象被检查点。

因此,我发现可靠地检查 DataFrame 的唯一方法是检查其关联的 RDD 并从中创建一个新的 DataFrame 对象。

我希望它有所帮助。干杯。

关于scala - 如何检查点数据帧?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33424445/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com