gpt4 book ai didi

scala - 如何将csv文件转换为rdd

转载 作者:行者123 更新时间:2023-12-02 04:59:04 26 4
gpt4 key购买 nike

我是 Spark 新手。我想对 CSV 记录中的特定数据执行一些操作。

我正在尝试读取 CSV 文件并将其转换为 RDD。我的进一步操作是基于 CSV 文件中提供的标题。

(来自评论)这是我到目前为止的代码:

final JavaRDD<String> File = sc.textFile(Filename).cache();
final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String s) {
return Arrays.asList(EOL.split(s));
}
});
final String heading=lines.first().toString();

我可以像这样获取 header 值。我想将其映射到 CSV 文件中的每条记录。

final String[] header=heading.split(" "); 

我可以像这样获取 header 值。我想将其映射到 CSV 文件中的每条记录。

在java中,我使用CSVReader record.getColumnValue(Column header)来获取特定值。我需要在这里做类似的事情。

最佳答案

一个简单的方法是有一种方法来保留 header 。

假设您有一个如下所示的 file.csv:

user, topic, hits
om, scala, 120
daniel, spark, 80
3754978, spark, 1

我们可以定义一个使用第一行的解析版本的 header 类:

class SimpleCSVHeader(header:Array[String]) extends Serializable {
val index = header.zipWithIndex.toMap
def apply(array:Array[String], key:String):String = array(index(key))
}

我们可以使用该 header 来进一步处理数据:

val csv = sc.textFile("file.csv")  // original file
val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows
val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line
val rows = data.filter(line => header(line,"user") != "user") // filter the header out
val users = rows.map(row => header(row,"user")
val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt)
...

请注意, header 只不过是助记符到数组索引的简单映射。几乎所有这些都可以在数组中元素的序数位置上完成,例如 user = row(0)

PS:欢迎使用 Scala :-)

关于scala - 如何将csv文件转换为rdd,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24299427/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com