gpt4 book ai didi

scala - 当您在 RDD 之外的 Spark 中执行 Java 数据操作时会发生什么

转载 作者:可可西里 更新时间:2023-11-01 15:02:13 27 4
gpt4 key购买 nike

我正在使用 Spark 从 hdfs 读取一个 csv 文件。它进入一个 FSDataInputStream 对象。我不能使用 textfile() 方法,因为它按换行符拆分 csv 文件,而我正在读取文本字段内有换行符的 csv 文件。来自 sourcefourge 的 Opencsv 处理单元格内的换行,这是一个不错的项目,但它接受 Reader 作为输入。我需要将它转换为字符串,以便我可以将它作为 StringReader 传递给 opencsv。因此,HDFS File -> FSdataINputStream -> String -> StringReader -> 一个 opencsv 字符串列表。下面是代码...

import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder

val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt

var b = Array.fill[Byte](2048)(0)
var j = 1

val stringBuilder = new StringBuilder()
var bufferString = ""

csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)

while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}

val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)

val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD

它有效,但我怀疑它是否可扩展。这让我想知道拥有一个驱动程序通过一个 jvm 传输所有数据的限制有多大。我对任何非常熟悉 spark 的人的问题是:

  1. 当您像这样对整个数据集进行数据操作时,甚至在它被放入输入 RDD 之前,会发生什么?它只是像其他任何程序一样对待,我猜会像疯了一样换掉吗?

  2. 那么您将如何使任何 spark 程序具有可扩展性?你总是需要将数据直接提取到输入 RDD 中吗?

最佳答案

您的代码将数据加载到内存中,然后 Spark 驱动程序将拆分并将每一部分数据发送给执行器,因为它是不可扩展的。
有两种方法可以解决您的问题。

write custom InputFormat to support CSV file format

import java.io.{InputStreamReader, IOException}

import com.google.common.base.Charsets
import com.opencsv.{CSVParser, CSVReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Seekable, Path, FileSystem}
import org.apache.hadoop.io.compress._
import org.apache.hadoop.io.{ArrayWritable, Text, LongWritable}
import org.apache.hadoop.mapred._

class CSVInputFormat extends FileInputFormat[LongWritable, ArrayWritable] with JobConfigurable {
private var compressionCodecs: CompressionCodecFactory = _

def configure(conf: JobConf) {
compressionCodecs = new CompressionCodecFactory(conf)
}

protected override def isSplitable(fs: FileSystem, file: Path): Boolean = {
val codec: CompressionCodec = compressionCodecs.getCodec(file)
if (null == codec) {
return true
}
codec.isInstanceOf[SplittableCompressionCodec]
}

@throws(classOf[IOException])
def getRecordReader(genericSplit: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, ArrayWritable] = {
reporter.setStatus(genericSplit.toString)
val delimiter: String = job.get("textinputformat.record.delimiter")
var recordDelimiterBytes: Array[Byte] = null
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8)
}
new CsvLineRecordReader(job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes)
}
}

class CsvLineRecordReader(job: Configuration, split: FileSplit, recordDelimiter: Array[Byte])
extends RecordReader[LongWritable, ArrayWritable] {
private val compressionCodecs = new CompressionCodecFactory(job)
private val maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE)
private var filePosition: Seekable = _
private val file = split.getPath
private val codec = compressionCodecs.getCodec(file)
private val isCompressedInput = codec != null
private val fs = file.getFileSystem(job)
private val fileIn = fs.open(file)

private var start = split.getStart
private var pos: Long = 0L
private var end = start + split.getLength
private var reader: CSVReader = _
private var decompressor: Decompressor = _

private lazy val CSVSeparator =
if (recordDelimiter == null)
CSVParser.DEFAULT_SEPARATOR
else
recordDelimiter(0).asInstanceOf[Char]

if (isCompressedInput) {
decompressor = CodecPool.getDecompressor(codec)
if (codec.isInstanceOf[SplittableCompressionCodec]) {
val cIn = (codec.asInstanceOf[SplittableCompressionCodec])
.createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
reader = new CSVReader(new InputStreamReader(cIn), CSVSeparator)
start = cIn.getAdjustedStart
end = cIn.getAdjustedEnd
filePosition = cIn
}else {
reader = new CSVReader(new InputStreamReader(codec.createInputStream(fileIn, decompressor)), CSVSeparator)
filePosition = fileIn
}
} else {
fileIn.seek(start)
reader = new CSVReader(new InputStreamReader(fileIn), CSVSeparator)
filePosition = fileIn
}

@throws(classOf[IOException])
private def getFilePosition: Long = {
if (isCompressedInput && null != filePosition) {
filePosition.getPos
}else
pos
}

private def nextLine: Option[Array[String]] = {
if (getFilePosition < end){
//readNext automatical split the line to elements
reader.readNext() match {
case null => None
case elems => Some(elems)
}
} else
None
}

override def next(key: LongWritable, value: ArrayWritable): Boolean =
nextLine
.exists { elems =>
key.set(pos)
val lineLength = elems.foldRight(0)((a, b) => a.length + 1 + b)
pos += lineLength
value.set(elems.map(new Text(_)))
if (lineLength < maxLineLength) true else false
}

@throws(classOf[IOException])
def getProgress: Float =
if (start == end)
0.0f
else
Math.min(1.0f, (getFilePosition - start) / (end - start).toFloat)

override def getPos: Long = pos

override def createKey(): LongWritable = new LongWritable

override def close(): Unit = {
try {
if (reader != null) {
reader.close
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor)
}
}
}

override def createValue(): ArrayWritable = new ArrayWritable(classOf[Text])
}

Simple test example:

val arrayRdd = sc.hadoopFile("source path", classOf[CSVInputFormat], classOf[LongWritable], classOf[ArrayWritable],
sc.defaultMinPartitions).map(_._2.get().map(_.toString))
arrayRdd.collect().foreach(e => println(e.mkString(",")))

The other way which I prefer uses spark-csv written by databricks, which is well supported for CSV file format, you can take some practices in the github page.

更新 spark-csv , 使用 univocity作为parserLib,可以处理多行单元格

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("parserLib", "univocity")
.option("inferSchema", "true") // Automatically infer data types
.load("source path")

关于scala - 当您在 RDD 之外的 Spark 中执行 Java 数据操作时会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35633957/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com