gpt4 book ai didi

scala - 使用 apache spark 和 scala 进行数据预处理

转载 作者:行者123 更新时间:2023-12-04 02:21:04 27 4
gpt4 key购买 nike

我是 spark 和 scala 的新手,因此我对使用 spark 进行数据预处理和使用 rdds 有一些疑问。我正在做一个小项目,我想用 spark 实现一个机器学习系统。我认为使用算法没问题,但我在预处理数据方面遇到了问题。我有一个包含 30 列和大约一百万行的数据集。但为简单起见,假设我有以下数据集(csv 文件):

columnA, columnB, column_txt, label
1 , a , abc , 0
2 , , abc , 0
3 , b , abc , 1
4 , b , abc , 1
5 , a , abc , 0
6 , , abc , 0
7 , c , abc , 1
8 , a , abc , 1
9 , b , abc , 1
10 , c , abc , 0

在 spark 中加载数据后,我想执行以下步骤:

  1. 删除所有以“_txt”结尾的列
  2. 过滤掉 columnB 为空的所有行(这个我已经想通了)
  3. 删除那些超过 9 个级别的列(这里是 columnA)

所以我对问题 1 和问题 3 有疑问。我知道我不能删除列所以我必须创建一个新的 rdd 但如果没有某些列我该怎么做呢?现在,我在 spark 中加载没有 header 的 csv 文件,但对于我的任务,我需要这样做。是否建议将 header 加载到单独的 rdd 中?但是我怎样才能与那个 rdd 交互以找到正确的列呢?抱歉,我知道很多问题,但我仍处于起步阶段,正在努力学习。谢谢并致以最诚挚的问候,克里斯

最佳答案

假设数据框加载了标题并且结构是扁平的:

val df = sqlContext.
read.
format("com.databricks.spark.csv").
option("header", "true").
load("data.csv")

像这样的东西应该可以工作:

import org.apache.spark.sql.DataFrame

def moreThan9(df: DataFrame, col: String) = {
df.agg(countDistinct(col)).first()(0) match {
case x: Long => x > 9L
case _ => false
}
}

val newDf = df.
schema. // Extract schema
toArray. // Convert to array
map(_.name). // Map to names
foldLeft(df)((df: DataFrame, col: String) => {
if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
})

如果它是在没有标题的情况下加载的,那么您可以使用从自动分配的到实际的映射来做同样的事情。

关于scala - 使用 apache spark 和 scala 进行数据预处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31547573/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com