gpt4 book ai didi

scala - 如何在 Spark 中压缩两个(或更多)DataFrame

转载 作者:行者123 更新时间:2023-12-02 01:52:20 24 4
gpt4 key购买 nike

我有两个 DataFrame aba 就像

Column 1 | Column 2
abc | 123
cde | 23

b 就像

Column 1 
1
2

我想压缩 ab (甚至更多)DataFrames,它会变成这样:

Column 1 | Column 2 | Column 3
abc | 123 | 1
cde | 23 | 2

我该怎么做?

最佳答案

DataFrame API 不支持这样的操作。可以zip两个 RDD,但要使其工作,您必须匹配分区数量和每个分区的元素数量。假设情况是这样:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}

val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")

// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)

// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

如果不满足上述条件,唯一想到的选择是添加索引和联接:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)

// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")

关于scala - 如何在 Spark 中压缩两个(或更多)DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32882529/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com