- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试 拆分 根据一个(或多个)列和 的值的数据框旋转每个生成的数据框 独立于其余部分。即,给定一个输入数据框:
val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
.toDF("name","age","company","address","country")
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//|jimmy| 30| a|street a| germany|
//| tom| 20| b|street b| germany|
//| joe| 60| c|street c|argentina|
//| lola| 40| d|street d|argentina|
//|maria| 50| e|street e|argentina|
//+-----+---+-------+--------+---------+
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val splitValuesSchema = dfWithID.select(splitCols: _*).schema
// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
.map(row => spark.sparkContext.makeRDD(List(row)))
.map(rdd => spark.createDataFrame(rdd, splitValuesSchema))
val rotateIDCols = Array(auxCol) ++ colsToRotate
// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
.map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))
// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
.map(df => df.select(auxCol).orderBy(rand()).toDF())
// get rdds with random ids
val randIdRdds = randIdDFs
.map(df => df.select(auxCol).rdd.map(row => row(0)))
// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
.map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
.map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))
val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
.createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
.reduce(_ union _).withColumnRenamed("rotated_id", "column2join")
// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
最佳答案
我一直试图通过尽可能多地删除性能不佳的调用(重新分区和一些收集)来找到更好、更清晰、更实用的解决方案。我添加了一种辅助方法来索引数据帧行,以便能够连接不相关的部分(不能由任何公共(public)列连接的列或 dfs)。这是我目前的开发,也去掉了 rdds 和 dataframes 之间的多重转换,看起来更具可读性和可理解性。
我希望这可以帮助有同样问题的人。
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.{LongType, StructField, StructType}
// auxiliar method to index row in dataframes
def addRowIndex(df: DataFrame) = spark.createDataFrame(
df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) },
StructType(df.schema.fields :+ StructField("index", LongType, false))
)
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col)
// get an array of dfs with the different values of the splitter column(s)
// --assuming there will not be too much different values in the splitter column--
val filterValues = dfWithID.select(splitCols: _*).distinct().collect()
// retrieve the different dfs according to the splitter values
val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit)
.foldLeft(dfWithID) {
(df, filterField) =>
df.filter(col(filterField._1) === filterField._2)
.select(rotateIDCols: _*)
})
// get and random reorder the aux id column for each dataframe
val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF())
// remove aux column for each dataframe
val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol))
val dfsTuples = splitWithoutIdDfs.zip(randIdDfs)
// index row of dfs with columns to rotate and dfs with random ids
val indexedDfsTuples = dfsTuples.map {
case (colsDf, idsDf) => (addRowIndex(colsDf), addRowIndex(idsDf))
}
// join reordered-ids dfs and cols to rotate dataframes by the index
val reorderedDfs = indexedDfsTuples.map {
case (df1, df2) => df1.join(df2, Seq("index"))
.drop("index").withColumnRenamed(auxCol, "column2join")
}
// union both dataframes to create the rotated df
reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc, df) => acc.union(df) }
// get the rest of the columns to get the part of the main df which does not change
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join the rotated and no rotated dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
关于scala - 通过一些列值拆分 Spark 数据帧,然后独立于其他数据帧旋转每个生成的数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51161899/
目前正在学习 Spark 的类(class)并了解到执行者的定义: Each executor will hold a chunk of the data to be processed. Thisc
阅读了有关 http://spark.apache.org/docs/0.8.0/cluster-overview.html 的一些文档后,我有一些问题想要澄清。 以 Spark 为例: JavaSp
Spark核心中的调度器与以下Spark Stack(来自Learning Spark:Lightning-Fast Big Data Analysis一书)中的Standalone Schedule
我想在 spark-submit 或 start 处设置 spark.eventLog.enabled 和 spark.eventLog.dir -all level -- 不要求在 scala/ja
我有来自 SQL Server 的数据,需要在 Apache Spark (Databricks) 中进行操作。 在 SQL Server 中,此表的三个键列使用区分大小写的 COLLATION 选项
所有这些有什么区别和用途? spark.local.ip spark.driver.host spark.driver.bind地址 spark.driver.hostname 如何将机器修复为 Sp
我有大约 10 个 Spark 作业,每个作业都会进行一些转换并将数据加载到数据库中。必须为每个作业单独打开和关闭 Spark session ,每次初始化都会耗费时间。 是否可以只创建一次 Spar
/Downloads/spark-3.0.1-bin-hadoop2.7/bin$ ./spark-shell 20/09/23 10:58:45 WARN Utils: Your hostname,
我是 Spark 的完全新手,并且刚刚开始对此进行更多探索。我选择了更长的路径,不使用任何 CDH 发行版安装 hadoop,并且我从 Apache 网站安装了 Hadoop 并自己设置配置文件以了解
TL; 博士 Spark UI 显示的内核和内存数量与我在使用 spark-submit 时要求的数量不同 更多细节: 我在独立模式下运行 Spark 1.6。 当我运行 spark-submit 时
spark-submit 上的文档说明如下: The spark-submit script in Spark’s bin directory is used to launch applicatio
关闭。这个问题是opinion-based .它目前不接受答案。 想改善这个问题吗?更新问题,以便可以通过 editing this post 用事实和引文回答问题. 6 个月前关闭。 Improve
我想了解接收器如何在 Spark Streaming 中工作。根据我的理解,将有一个接收器任务在执行器中运行,用于收集数据并保存为 RDD。当调用 start() 时,接收器开始读取。需要澄清以下内容
有没有办法在不同线程中使用相同的 spark 上下文并行运行多个 spark 作业? 我尝试使用 Vertx 3,但看起来每个作业都在排队并按顺序启动。 如何让它在相同的 spark 上下文中同时运行
我们有一个 Spark 流应用程序,这是一项长期运行的任务。事件日志指向 hdfs 位置 hdfs://spark-history,当我们开始流式传输应用程序时正在其中创建 application_X
我们正在尝试找到一种加载 Spark (2.x) ML 训练模型的方法,以便根据请求(通过 REST 接口(interface))我们可以查询它并获得预测,例如http://predictor.com
Spark newb 问题:我在 spark-sql 中进行完全相同的 Spark SQL 查询并在 spark-shell . spark-shell版本大约需要 10 秒,而 spark-sql版
我正在使用 Spark 流。根据 Spark 编程指南(参见 http://spark.apache.org/docs/latest/programming-guide.html#accumulato
我正在使用 CDH 5.2。我可以使用 spark-shell 运行命令。 如何运行包含spark命令的文件(file.spark)。 有没有办法在不使用 sbt 的情况下在 CDH 5.2 中运行/
我使用 Elasticsearch 已经有一段时间了,但使用 Cassandra 的经验很少。 现在,我有一个项目想要使用 Spark 来处理数据,但我需要决定是否应该使用 Cassandra 还是
我是一名优秀的程序员,十分优秀!