- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试 拆分 根据一个(或多个)列和 的值的数据框旋转每个生成的数据框 独立于其余部分。即,给定一个输入数据框:
val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
.toDF("name","age","company","address","country")
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//|jimmy| 30| a|street a| germany|
//| tom| 20| b|street b| germany|
//| joe| 60| c|street c|argentina|
//| lola| 40| d|street d|argentina|
//|maria| 50| e|street e|argentina|
//+-----+---+-------+--------+---------+
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val splitValuesSchema = dfWithID.select(splitCols: _*).schema
// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
.map(row => spark.sparkContext.makeRDD(List(row)))
.map(rdd => spark.createDataFrame(rdd, splitValuesSchema))
val rotateIDCols = Array(auxCol) ++ colsToRotate
// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
.map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))
// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
.map(df => df.select(auxCol).orderBy(rand()).toDF())
// get rdds with random ids
val randIdRdds = randIdDFs
.map(df => df.select(auxCol).rdd.map(row => row(0)))
// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
.map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
.map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))
val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
.createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
.reduce(_ union _).withColumnRenamed("rotated_id", "column2join")
// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
最佳答案
我一直试图通过尽可能多地删除性能不佳的调用(重新分区和一些收集)来找到更好、更清晰、更实用的解决方案。我添加了一种辅助方法来索引数据帧行,以便能够连接不相关的部分(不能由任何公共(public)列连接的列或 dfs)。这是我目前的开发,也去掉了 rdds 和 dataframes 之间的多重转换,看起来更具可读性和可理解性。
我希望这可以帮助有同样问题的人。
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.{LongType, StructField, StructType}
// auxiliar method to index row in dataframes
def addRowIndex(df: DataFrame) = spark.createDataFrame(
df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) },
StructType(df.schema.fields :+ StructField("index", LongType, false))
)
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col)
// get an array of dfs with the different values of the splitter column(s)
// --assuming there will not be too much different values in the splitter column--
val filterValues = dfWithID.select(splitCols: _*).distinct().collect()
// retrieve the different dfs according to the splitter values
val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit)
.foldLeft(dfWithID) {
(df, filterField) =>
df.filter(col(filterField._1) === filterField._2)
.select(rotateIDCols: _*)
})
// get and random reorder the aux id column for each dataframe
val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF())
// remove aux column for each dataframe
val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol))
val dfsTuples = splitWithoutIdDfs.zip(randIdDfs)
// index row of dfs with columns to rotate and dfs with random ids
val indexedDfsTuples = dfsTuples.map {
case (colsDf, idsDf) => (addRowIndex(colsDf), addRowIndex(idsDf))
}
// join reordered-ids dfs and cols to rotate dataframes by the index
val reorderedDfs = indexedDfsTuples.map {
case (df1, df2) => df1.join(df2, Seq("index"))
.drop("index").withColumnRenamed(auxCol, "column2join")
}
// union both dataframes to create the rotated df
reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc, df) => acc.union(df) }
// get the rest of the columns to get the part of the main df which does not change
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join the rotated and no rotated dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
关于scala - 通过一些列值拆分 Spark 数据帧,然后独立于其他数据帧旋转每个生成的数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51161899/
我如何使用 CQLINQ 获取当前方法的输入参数集合?有像“参数”或“参数”这样的集合,只有“NbParamenter”不适合我的目的。 最佳答案 事实上,CQLinq 还没有这个功能。但是,在许多情
我想知道是否有人知道我的 makefile 中独立的 @ 符号和“dir”命令在这里(第二行和第三行)的作用: $(BUILD)/%.o: %.cpp @mkdir -p $(dir $@)
我想知道是否有人知道我的 makefile 中独立的 @ 符号和“dir”命令在这里(第二行和第三行)的作用: $(BUILD)/%.o: %.cpp @mkdir -p $(dir $@)
我的机器上有带有 4 个 cpu 的 Ubuntu 14.04(nproc 恢复了 4 个)。我安装并执行 Spark Standalone 后(本地),我可以自己定义不同数量的奴隶。例如我想要有4个
我看到所有这些 iPhone 应用程序都带有内置的独立 webDav 服务器。是否有可以集成到现有应用程序中的独立(如在其自己的 IIS 中)C# webDAV 项目。 最佳答案 至少有两个用于 .N
我如何在独立的 Django 应用程序上进行迁移(即不属于任何项目的应用程序)。 例如在以下之后:https://docs.djangoproject.com/en/1.8/intro/reusabl
我目前正在使用 tortoiseSVN 对本地编程文件进行版本控制。我不运行 SVN 服务器,因为可以直接使用 tortoiseSVN(例如 http://invalidlogic.com/2006/
我有一些 Bootstrap 代码,当用户查看它时,它可以很好地为进度条部分设置动画。 然而它动画 全部 页面中的进度条而不是动画仅限 该查看部分中的进度条。结果,当用户转到进度条的另一部分时,这些已
我认为我们在 iOS 13.2/13.3 中发现了关于在独立模式下运行的 PWA 的回归。 由于在 iOS PWA 上无法访问 getUserMedia() 我们依赖 capture HTML5 输入
我有一个每周从系统运行一次的报告,并将数据导出到 Excel 文档中。我已经设置了将数据导出到 Excel 的工具,以便在格式化方面做得很好,但是一旦数据进入 Excel,我还需要做更多的事情。 是否
//值数组的格式为 { "var1", "val1", "var2", "val2",.. } public static String replaceMethod(String template,
当我在 eclipse 中运行我的项目时,它工作正常,当我将它导出为独立 jar 时,它会滞后。我使用相同的 vmargs,在 Eclipse 中尝试了 3 种不同的导出设置,似乎没有任何帮助 最佳答
我了解到 Java EE 中我非常喜欢的注释基础配置(@Resource)功能。然后我注意到注释实际上是 Java SE 的一部分。 所以我想知道是否可以将它与 Java SE 一起使用。我当然可以在
我无法理解为什么这种关系没有被持久化,并且程序不会正常退出,但在 Eclipse 中继续运行。 下面是我的代码,排除了包名: 主要: import java.io.BufferedInputStrea
我有一个在 Linux + Java 6 上运行的独立 Java 应用程序,它似乎被卡住了(没有生成日志)我如何在不使用任何其他工具(例如 jstack)的情况下获取此线程转储 尝试了以下命令,但它们
我正在非节点环境中构建应用程序,但我想利用 Babel 的 ES6 转译,以便我可以编写更好的代码并且仍然支持 IE11。 所以我继续包含在这里找到的独立文件: https://github.com/
扩展我对 MySQL 的理解。 1) 是否需要 64 位帮助?我是安装还是单独使用? 2) 如果我打算在 MySQL Community Service 中使用 64 位,它会影响仅提供 32 位的
我有一个独立的 Java 应用程序,我必须为其集成一个规则引擎。我应该使用属性文件或 XML 文件定义规则。我需要规则引擎来读取属性或 XML 文件中定义的这些规则,并相应地在应用程序中实现代码。 任
我是wiremock新手,我正在尝试使用它来记录我负责集成测试的java应用程序的请求和响应。 我知道我的命令将类似于: java -jar wiremock-1.57-standalone.jar
我到处寻找我的问题的解决方案,但我的问题有点具体...我需要有关如何创建独立 radioGroup 列表的建议,例如图示: o item1 • item1' • item2 或 item2' o it
我是一名优秀的程序员,十分优秀!