gpt4 book ai didi

在大型数据集上将 Spark DataFrame 从长到宽 reshape

转载 作者:行者123 更新时间:2023-12-02 05:38:06 25 4
gpt4 key购买 nike

我正在尝试使用 Spark DataFrame API 将我的数据帧从长变为宽。数据集是学生提问的问题和答案的集合。这是一个巨大的数据集,Q(问题)和 A(答案)大约范围从 1 到 50000。我想收集所有可能的 Q*A 对并使用它们来构建列。如果学生对问题 1 的回答为 1,我们将值 1 分配给第 1_1 列。否则,我们给它一个0。数据集已在S_ID、Q、A上进行去重。

在 R 中,我可以简单地在库 reshape2 中使用 dcast,但我不知道如何使用 Spark 来做到这一点。我在下面的链接中找到了旋转的解决方案,但它需要固定数量的不同对的 Q*A。 http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe

我还尝试使用用户定义的函数连接 Q 和 A,并应用交叉表但是,我从控制台收到以下错误,即使到目前为止我只在示例数据文件上测试我的代码 -

The maximum limit of le6 pairs have been collected, which may not be all of the pairs.  
Please try reducing the amount of distinct items in your columns.

原始数据:

S_ID, Q, A
1, 1, 1
1, 2, 2
1, 3, 3
2, 1, 1
2, 2, 3
2, 3, 4
2, 4, 5

=> 长到宽转换后:

S_ID, QA_1_1, QA_2_2, QA_3_3, QA_2_3, QA_3_4, QA_4_5
1, 1, 1, 1, 0, 0, 0
2, 1, 0, 0, 1, 1, 1

R code.  
library(dplyr); library(reshape2);
df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1)
df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0)

Spark code.
val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y})
df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A"))
df2 = stat.crosstab("S_ID", "QA")

任何想法将不胜感激。

最佳答案

您在这里尝试执行的操作在设计上是错误的,原因有两个:

  1. 您将稀疏数据集替换为密集数据集。当涉及到内存需求和计算时,它的成本很高,而且当您拥有大型数据集时,它几乎从来都不是一个好主意
  2. 您限制了本地处理数据的能力。稍微简化一下 Spark 数据帧只是 RDD[Row] 的包装器。这意味着行越大,您可以在单个分区上放置的内容就越少,因此聚合等操作的成本要高得多,并且需要更多的网络流量。

当您拥有适当的列式存储并且可以实现高效压缩或聚合等功能时,宽表非常有用。从实用的角度来看,几乎所有可以用宽表做的事情都可以用长表使用组/窗口函数来完成。

您可以尝试的一件事是使用稀疏向量创建宽格式:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.StringIndexer
import sqlContext.implicits._

df.registerTempTable("df")
val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df")

val indexer = new StringIndexer()
.setInputCol("qa")
.setOutputCol("idx")
.fit(dfComb)

val indexed = indexer.transform(dfComb)

val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1

val wideLikeDF = indexed
.select($"s_id", $"idx")
.rdd
.map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)}
.groupByKey // This assumes no duplicates
.mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray))
.toDF("id", "qaVec")

很酷的一点是您可以轻松地将其转换为 IndexedRowMatrix 并计算 SVD

val mat = new IndexedRowMatrix(wideLikeDF.map{
// Here we assume that s_id can be mapped directly to Long
// If not it has to be indexed
case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec)
})

val svd = mat.computeSVD(3)

RowMatrix并获取列统计信息或计算主成分:

val colStats = mat.toRowMatrix.computeColumnSummaryStatistic
val colSims = mat.toRowMatrix.columnSimilarities
val pc = mat.toRowMatrix.computePrincipalComponents(3)

编辑:

在 Spark 1.6.0+ 中,您可以使用pivot 函数。

关于在大型数据集上将 Spark DataFrame 从长到宽 reshape ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31979256/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com