gpt4 book ai didi

scala - DataFrame 化的 zipWithIndex

转载 作者:行者123 更新时间:2023-12-03 07:27:22 31 4
gpt4 key购买 nike

我正在尝试解决向数据集添加序列号的古老问题。我正在使用 DataFrame,并且似乎没有与 RDD.zipWithIndex 等效的 DataFrame。另一方面,以下内容或多或少按照我想要的方式工作:

val origDF = sqlContext.load(...)    

val seqDF= sqlContext.createDataFrame(
origDF.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq(ln._2) ++ ln._1.toSeq)),
StructType(Array(StructField("seq", LongType, false)) ++ origDF.schema.fields)
)

在我的实际应用程序中,origDF 不会直接从文件中加载 - 它将通过将 2-3 个其他 DataFrame 连接在一起来创建,并且将包含多达 1 亿行。

有更好的方法吗?我可以做些什么来优化它?

最佳答案

以下内容是代表 David Griffin 发布的(未经编辑)。

能歌善舞的 dfZipWithIndex 方法。您可以设置起始偏移量(默认为1)、索引列名称(默认为“id”)以及将该列放在前面或后面:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row


def dfZipWithIndex(
df: DataFrame,
offset: Int = 1,
colName: String = "id",
inFront: Boolean = true
) : DataFrame = {
df.sqlContext.createDataFrame(
df.rdd.zipWithIndex.map(ln =>
Row.fromSeq(
(if (inFront) Seq(ln._2 + offset) else Seq())
++ ln._1.toSeq ++
(if (inFront) Seq() else Seq(ln._2 + offset))
)
),
StructType(
(if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
++ df.schema.fields ++
(if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
)
)
}

关于scala - DataFrame 化的 zipWithIndex,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30304810/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com