- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在使用 Spark 从 hdfs 读取一个 csv 文件。它进入一个 FSDataInputStream 对象。我不能使用 textfile() 方法,因为它按换行符拆分 csv 文件,而我正在读取文本字段内有换行符的 csv 文件。来自 sourcefourge 的 Opencsv 处理单元格内的换行,这是一个不错的项目,但它接受 Reader 作为输入。我需要将它转换为字符串,以便我可以将它作为 StringReader 传递给 opencsv。因此,HDFS File -> FSdataINputStream -> String -> StringReader -> 一个 opencsv 字符串列表。下面是代码...
import java.io._
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.fs._
import org.apache.hadoop.conf._
import com.opencsv._
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import java.lang.StringBuilder
val conf = new Configuration()
val hdfsCoreSitePath = new Path("core-site.xml")
val hdfsHDFSSitePath = new Path("hdfs-site.xml")
conf.addResource(hdfsCoreSitePath)
conf.addResource(hdfsHDFSSitePath)
val fileSystem = FileSystem.get(conf)
val csvPath = new Path("/raw_data/project_name/csv/file_name.csv")
val csvFile = fileSystem.open(csvPath)
val fileLen = fileSystem.getFileStatus(csvPath).getLen().toInt
var b = Array.fill[Byte](2048)(0)
var j = 1
val stringBuilder = new StringBuilder()
var bufferString = ""
csvFile.seek(0)
csvFile.read(b)
var bufferString = new String(b,"UTF-8")
stringBuilder.append(bufferString)
while(j != -1) {b = Array.fill[Byte](2048)(0);j=csvFile.read(b);bufferString = new String(b,"UTF-8");stringBuilder.append(bufferString)}
val stringBuilderClean = new StringBuilder()
stringBuilderClean = stringBuilder.substring(0,fileLen)
val reader: Reader = new StringReader(stringBuilderClean.toString()).asInstanceOf[Reader]
val csv = new CSVReader(reader)
val javaContext = new JavaSparkContext(sc)
val sqlContext = new SQLContext(sc)
val javaRDD = javaContext.parallelize(csv.readAll())
//do a bunch of transformations on the RDD
它有效,但我怀疑它是否可扩展。这让我想知道拥有一个驱动程序通过一个 jvm 传输所有数据的限制有多大。我对任何非常熟悉 spark 的人的问题是:
当您像这样对整个数据集进行数据操作时,甚至在它被放入输入 RDD 之前,会发生什么?它只是像其他任何程序一样对待,我猜会像疯了一样换掉吗?
那么您将如何使任何 spark 程序具有可扩展性?你总是需要将数据直接提取到输入 RDD 中吗?
最佳答案
您的代码将数据加载到内存中,然后 Spark 驱动程序将拆分并将每一部分数据发送给执行器,因为它是不可扩展的。
有两种方法可以解决您的问题。
write custom InputFormat to support CSV file format
import java.io.{InputStreamReader, IOException}
import com.google.common.base.Charsets
import com.opencsv.{CSVParser, CSVReader}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Seekable, Path, FileSystem}
import org.apache.hadoop.io.compress._
import org.apache.hadoop.io.{ArrayWritable, Text, LongWritable}
import org.apache.hadoop.mapred._
class CSVInputFormat extends FileInputFormat[LongWritable, ArrayWritable] with JobConfigurable {
private var compressionCodecs: CompressionCodecFactory = _
def configure(conf: JobConf) {
compressionCodecs = new CompressionCodecFactory(conf)
}
protected override def isSplitable(fs: FileSystem, file: Path): Boolean = {
val codec: CompressionCodec = compressionCodecs.getCodec(file)
if (null == codec) {
return true
}
codec.isInstanceOf[SplittableCompressionCodec]
}
@throws(classOf[IOException])
def getRecordReader(genericSplit: InputSplit, job: JobConf, reporter: Reporter): RecordReader[LongWritable, ArrayWritable] = {
reporter.setStatus(genericSplit.toString)
val delimiter: String = job.get("textinputformat.record.delimiter")
var recordDelimiterBytes: Array[Byte] = null
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8)
}
new CsvLineRecordReader(job, genericSplit.asInstanceOf[FileSplit], recordDelimiterBytes)
}
}
class CsvLineRecordReader(job: Configuration, split: FileSplit, recordDelimiter: Array[Byte])
extends RecordReader[LongWritable, ArrayWritable] {
private val compressionCodecs = new CompressionCodecFactory(job)
private val maxLineLength = job.getInt(org.apache.hadoop.mapreduce.lib.input.
LineRecordReader.MAX_LINE_LENGTH, Integer.MAX_VALUE)
private var filePosition: Seekable = _
private val file = split.getPath
private val codec = compressionCodecs.getCodec(file)
private val isCompressedInput = codec != null
private val fs = file.getFileSystem(job)
private val fileIn = fs.open(file)
private var start = split.getStart
private var pos: Long = 0L
private var end = start + split.getLength
private var reader: CSVReader = _
private var decompressor: Decompressor = _
private lazy val CSVSeparator =
if (recordDelimiter == null)
CSVParser.DEFAULT_SEPARATOR
else
recordDelimiter(0).asInstanceOf[Char]
if (isCompressedInput) {
decompressor = CodecPool.getDecompressor(codec)
if (codec.isInstanceOf[SplittableCompressionCodec]) {
val cIn = (codec.asInstanceOf[SplittableCompressionCodec])
.createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec.READ_MODE.BYBLOCK)
reader = new CSVReader(new InputStreamReader(cIn), CSVSeparator)
start = cIn.getAdjustedStart
end = cIn.getAdjustedEnd
filePosition = cIn
}else {
reader = new CSVReader(new InputStreamReader(codec.createInputStream(fileIn, decompressor)), CSVSeparator)
filePosition = fileIn
}
} else {
fileIn.seek(start)
reader = new CSVReader(new InputStreamReader(fileIn), CSVSeparator)
filePosition = fileIn
}
@throws(classOf[IOException])
private def getFilePosition: Long = {
if (isCompressedInput && null != filePosition) {
filePosition.getPos
}else
pos
}
private def nextLine: Option[Array[String]] = {
if (getFilePosition < end){
//readNext automatical split the line to elements
reader.readNext() match {
case null => None
case elems => Some(elems)
}
} else
None
}
override def next(key: LongWritable, value: ArrayWritable): Boolean =
nextLine
.exists { elems =>
key.set(pos)
val lineLength = elems.foldRight(0)((a, b) => a.length + 1 + b)
pos += lineLength
value.set(elems.map(new Text(_)))
if (lineLength < maxLineLength) true else false
}
@throws(classOf[IOException])
def getProgress: Float =
if (start == end)
0.0f
else
Math.min(1.0f, (getFilePosition - start) / (end - start).toFloat)
override def getPos: Long = pos
override def createKey(): LongWritable = new LongWritable
override def close(): Unit = {
try {
if (reader != null) {
reader.close
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor)
}
}
}
override def createValue(): ArrayWritable = new ArrayWritable(classOf[Text])
}
Simple test example:
val arrayRdd = sc.hadoopFile("source path", classOf[CSVInputFormat], classOf[LongWritable], classOf[ArrayWritable],
sc.defaultMinPartitions).map(_._2.get().map(_.toString))
arrayRdd.collect().foreach(e => println(e.mkString(",")))
The other way which I prefer uses spark-csv written by databricks, which is well supported for CSV file format, you can take some practices in the github page.
更新 spark-csv , 使用 univocity作为parserLib,可以处理多行单元格
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("parserLib", "univocity")
.option("inferSchema", "true") // Automatically infer data types
.load("source path")
关于scala - 当您在 RDD 之外的 Spark 中执行 Java 数据操作时会发生什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35633957/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!