gpt4 book ai didi

java - 在 Spark 中读取 csv 文件时如何处理不带引号的多行

转载 作者:行者123 更新时间:2023-12-02 08:47:53 25 4
gpt4 key购买 nike

以前曾询问过此问题的变体,但就我而言,多行中没有带引号的字符串。

我有一个这样的文件。

column1|column2
1|test1 test1
test1
2|test2

我想要这样的结果:(2行)

+-----------+------------------+
|column1 |column2 |
+-----------+------------------+
| 1| test1 test1
test1
| 2|test2 |

我尝试过这个:

 Dataset<Row> dsTest = session.read()
.option("header", "true")
.option("delimiter", "|")
.option("quote", "")
.option("multiLine", "true")
.csv("test.csv");

我得到了这个(3行)

 +-----------+------------------+
|column1 |column2 |
+-----------+------------------+
| 1| test1 test1
test1| null
| 2|test2 |

有人可以指导我解决这个问题吗?

最佳答案

我尝试过类似的方法,也许它可以给你一些线索。

代码是在 Scala 中,但我认为一切都很清楚,在 Java 中它或多或少是相同的。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

import scala.util.{Failure, Success, Try}

object Multiline {

val spark = SparkSession
.builder()
.appName("Multiline")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id", "Multiline") // To silence Metrics warning
.getOrCreate()

val sc = spark.sparkContext

val input = "/home/cloudera/files/tests/multiline.csv"

def main(args: Array[String]): Unit = {

Logger.getRootLogger.setLevel(Level.ERROR)

import spark.implicits._

try {

def makeInt(s: String): Int = Try(s.toInt) match {
case Success(n) => n
case Failure(_) => -1
}

val data = sc.textFile(input)
val head = data.first() // header

val multiline = data
.filter(line => line != head) // remove header
.map(line => line.split('|'))
.map(arr =>{
val sInt: Int = makeInt(arr(0))
if(sInt < 0) (sInt.toString, arr(0))
else (arr(0),arr(1))
})
.toDF("column1", "column2")

multiline.show()

/*
+-------+------------+
|column1| column2|
+-------+------------+
| 1|test1 test1 |
| -1| test1|
| 2| test2|
+-------+------------+
*/

// To have the opportunity to view the web console of Spark: http://localhost:4040/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped.")
spark.stop()
println("SparkSession stopped.")
}
}
}

关于java - 在 Spark 中读取 csv 文件时如何处理不带引号的多行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60965526/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com