gpt4 book ai didi

scala - Spark 数据帧 : Accessing next record in map function

转载 作者:行者123 更新时间:2023-12-01 11:26:18 24 4
gpt4 key购买 nike

我有一个带有时间戳列的 DF,按此列排序。有没有办法做到这一点:对于每条记录,访问下一条记录以计算两条线之间的时间差?我认为这在 map 函数中是不可能的,因为这两行可能在不同的节点上处理。

谢谢!

最佳答案

对于 Spark 1.4 或更高版本,如果您可以使用 Hive 上下文,以下代码可能适用于您:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql._

val hc = new HiveContext(sc)
val df = hc.read.format("...").load("...")

val timestamp_column = df("timestamp_column")
val next_row_timestamp = lead(timestamp_column, 1).over(Window.orderBy(timestamp_column))

val newDF = df.withColumn("time_difference", next_row_timestamp.cast(LongType) - timestamp_column.cast(LongType))

说明:

在这段代码中,我使用了 lead(e: Column, offset: Int) functions中可用的窗函数包 ( doc )。该函数实际上使用列 e 中的数据创建了一个新列。 (在示例中为 timestamp_column)由 offset 移位(在示例中为 1)。要正常工作,它后面必须跟一个 over(window: WindowSpec)调用,它使用 Window 对象定义一个窗口。这个窗口可以由一个分区和一个顺序组成。在这种情况下,我只使用 Window.orderBy 设置顺序.

最后,我使用 withColumn 在原始 DataFrame 中添加一列,两列之间的差异以秒为单位(或毫秒?不确定)。 .

有关更多详细信息,以下链接很好地解释了该想法,并附有示例:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

编辑:

正如评论中所指出的,上述解决方案可能非常低效。作为替代方案,可以使用 RDD 解决方案:
val newRDD = df.rdd.zipWithIndex.flatMap {
case (row, idx) => (0 to 1).map { lag => (idx - lag, row) }
}
.groupByKey
.values
.map { pair =>
val pairArray = pair.toArray
val timeDiff = {
if (pairArray.length == 1) null
else pairArray(1).getAs[java.sql.Timestamp]("timestamp_column").getTime - pairArray(0).getAs[java.sql.Timestamp]("timestamp_column").getTime
}
Row.merge(Row(timeDiff), pairArray(0))
}

val newSchema = StructType(StructField("time_diff", LongType, true) +: df.schema.fields)
val newDf = df.sqlContext.createDataFrame(newRDD, newSchema)
newDF 中的结果数据框将有一个新列“time_diff”,其中包含当前行和下一行之间的时间差(以毫秒为单位)。

关于scala - Spark 数据帧 : Accessing next record in map function,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37027566/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com