gpt4 book ai didi

scala - Spark : Transpose DataFrame Without Aggregating

转载 作者:行者123 更新时间:2023-12-04 07:33:46 25 4
gpt4 key购买 nike

我在网上看了很多问题,但是它们似乎并没有解决我要解决的问题。

我在Scala中使用Apache Spark 2.0.2。

我有一个数据框:

+----------+-----+----+----+----+----+----+
|segment_id| val1|val2|val3|val4|val5|val6|
+----------+-----+----+----+----+----+----+
| 1| 100| 0| 0| 0| 0| 0|
| 2| 0| 50| 0| 0| 20| 0|
| 3| 0| 0| 0| 0| 0| 0|
| 4| 0| 0| 0| 0| 0| 0|
+----------+-----+----+----+----+----+----+

我想换位到
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
|val2| 0| 50| 0| 0|
|val3| 0| 0| 0| 0|
|val4| 0| 0| 0| 0|
|val5| 0| 20| 0| 0|
|val6| 0| 0| 0| 0|
+----+-----+----+----+----+

我尝试使用 pivot(),但找不到正确的答案。我最终遍历了 val{x}列,并按照下面的顺序进行了旋转,但是事实证明这非常慢。
val d = df.select('segment_id, 'val1)

+----------+-----+
|segment_id| val1|
+----------+-----+
| 1| 100|
| 2| 0|
| 3| 0|
| 4| 0|
+----------+-----+

d.groupBy('val1).sum().withColumnRenamed('val1', 'vals')

+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val1| 100| 0| 0| 0|
+----+-----+----+----+----+

然后在对 union()的每次迭代中使用 val{x}到我的第一个数据帧。
+----+-----+----+----+----+
|vals| 1| 2| 3| 4|
+----+-----+----+----+----+
|val2| 0| 50| 0| 0|
+----+-----+----+----+----+

是否有更有效的转置方式,我不想聚合数据?

谢谢 :)

最佳答案

不幸的是,在任何情况下都不会发生:

  • 考虑到数据量,Spark DataFrame是合理的。
  • 数据转置是可行的。

  • 您必须记住,Spark中实现的 DataFrame是行的分布式集合,每行都在单个节点上存储和处理。

    您可以将 DataFrame上的换位表示为 pivot:
    val kv = explode(array(df.columns.tail.map { 
    c => struct(lit(c).alias("k"), col(c).alias("v"))
    }: _*))

    df
    .withColumn("kv", kv)
    .select($"segment_id", $"kv.k", $"kv.v")
    .groupBy($"k")
    .pivot("segment_id")
    .agg(first($"v"))
    .orderBy($"k")
    .withColumnRenamed("k", "vals")

    但这只是一个玩具代码,没有实际应用。实际上,这并不比收集数据更好:
    val (header, data) = df.collect.map(_.toSeq.toArray).transpose match {
    case Array(h, t @ _*) => {
    (h.map(_.toString), t.map(_.collect { case x: Int => x }))
    }
    }

    val rows = df.columns.tail.zip(data).map { case (x, ys) => Row.fromSeq(x +: ys) }
    val schema = StructType(
    StructField("vals", StringType) +: header.map(StructField(_, IntegerType))
    )

    spark.createDataFrame(sc.parallelize(rows), schema)

    对于 DataFrame定义为:
    val df = Seq(
    (1, 100, 0, 0, 0, 0, 0),
    (2, 0, 50, 0, 0, 20, 0),
    (3, 0, 0, 0, 0, 0, 0),
    (4, 0, 0, 0, 0, 0, 0)
    ).toDF("segment_id", "val1", "val2", "val3", "val4", "val5", "val6")

    两者都可以给您想要的结果:
    +----+---+---+---+---+
    |vals| 1| 2| 3| 4|
    +----+---+---+---+---+
    |val1|100| 0| 0| 0|
    |val2| 0| 50| 0| 0|
    |val3| 0| 0| 0| 0|
    |val4| 0| 0| 0| 0|
    |val5| 0| 20| 0| 0|
    |val6| 0| 0| 0| 0|
    +----+---+---+---+---+

    话虽这么说,如果您需要对分布式数据结构进行有效的转换,则必须另寻别处。有许多结构,包括核心 CoordinateMatrixBlockMatrix,它们可以在两个维度上分布数据并可以转置。

    关于scala - Spark : Transpose DataFrame Without Aggregating,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40892459/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com