gpt4 book ai didi

scala - 在spark中导入TSV文件

转载 作者:行者123 更新时间:2023-12-01 23:00:55 24 4
gpt4 key购买 nike

我是 Spark 新手,请原谅我问一个基本问题。我正在尝试将我的 tsv 文件导入 Spark,但我不确定它是否有效。

val lines = sc.textFile("/home/cloudera/Desktop/Test/test.tsv
val split_lines = lines.map(_.split("\t"))
split_lines.first()

一切似乎都运行良好。有没有办法可以查看 tsv 文件是否已正确加载?

示例数据:(全部使用制表符作为空格)

hastag 200904 24 blackcat
hastag 200908 1 oaddisco
hastag 200904 1 blah
hastag 200910 3 mydda

最佳答案

到目前为止,你的代码对我来说看起来不错。如果将第一行打印到控制台,您会看到预期的数据吗?

要探索 Spark API,最好的办法是使用 Spark-shell,这是一个富含 Spark 特定功能的 Scala REPL,可以为您构建默认的 Spark 上下文。

它会让您更轻松地探索数据。

以下是加载约 65k 行 csv 文件的示例。我猜与您正在做的事情类似的用例:

$><spark_dir>/bin/spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.0.0-SNAPSHOT
/_/

scala> val lines=sc.textFile("/home/user/playground/ts-data.csv")
lines: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

scala> val csv=lines.map(_.split(";"))
csv: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

scala> csv.count
(... spark processing ...)
res0: Long = 67538


// let's have a look at the first record
scala> csv.first
14/06/01 12:22:17 INFO SparkContext: Starting job: first at <console>:17
14/06/01 12:22:17 INFO DAGScheduler: Got job 1 (first at <console>:17) with 1 output partitions (allowLocal=true)
14/06/01 12:22:17 INFO DAGScheduler: Final stage: Stage 1(first at <console>:17)
14/06/01 12:22:17 INFO DAGScheduler: Parents of final stage: List()
14/06/01 12:22:17 INFO DAGScheduler: Missing parents: List()
14/06/01 12:22:17 INFO DAGScheduler: Computing the requested partition locally
14/06/01 12:22:17 INFO HadoopRDD: Input split: file:/home/user/playground/ts-data.csv:0+1932934
14/06/01 12:22:17 INFO SparkContext: Job finished: first at <console>:17, took 0.003210457 s
res1: Array[String] = Array(20140127, 0000df, d063b4, ***, ***-Service,app180000m,49)

// groupby id - count unique
scala> csv.groupBy(_(4)).count
(... Spark processing ...)
res2: Long = 37668

// records per day
csv.map(record => record(0)->1).reduceByKey(_+_).collect
(... more Spark processing ...)
res8: Array[(String, Int)] = Array((20140117,1854), (20140120,2028), (20140124,3398), (20140131,6084), (20140122,5076), (20140128,8310), (20140123,8476), (20140127,1932), (20140130,8482), (20140129,8488), (20140118,5100), (20140109,3488), (20140110,4822))

* 使用添加到问题中的数据进行编辑*

val rawData="""hastag 200904 24 blackcat
hastag 200908 1 oaddisco
hastag 200904 1 blah
hastag 200910 3 mydda"""
//split lines
val data= rawData.split("\n")
val rdd= sc.parallelize(data)
// Split using space as separator
val byId=rdd.map(_.split(" ")).groupBy(_(1))
byId.count
res11: Long = 3

关于scala - 在spark中导入TSV文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23977792/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com