gpt4 book ai didi

scala - Spark 集群 : Read Large number of small avro files is taking too long to list

转载 作者:行者123 更新时间:2023-12-04 14:19:03 26 4
gpt4 key购买 nike

我知道在 HDFS 中读取大量小文件的问题一直是一个问题并被广泛讨论,但请耐心等待。大多数处理此类问题的 stackoverflow 问题都与读取大量 txt 文件有关。我正在尝试读取大量小型 avro 文件

此外,这些阅读 txt 文件解决方案谈到使用作为 RDD 实现的 WholeTextFileInputFormat 或 CombineInputFormat ( https://stackoverflow.com/a/43898733/11013878 ),我使用的是 Spark 2.4 (HDFS 3.0.0) 并且通常不鼓励使用 RDD 实现,并且首选数据帧。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。

我已经尝试按照 Murtaza 的建议合并数据帧,但是在大量文件中出现 OOM 错误( https://stackoverflow.com/a/32117661/11013878 )

我正在使用以下代码

val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String] 
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")

df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*)
.mode(SaveMode.Append)
.option("compression","snappy")
.parquet(avroConsolidator.parquetFilePath.toString)

1.6分钟在作业级别列出183个小文件
Jobs UI

奇怪的是,我的舞台 UI 页面只显示 3 秒(不明白为什么)
Stage UI

avro 文件存储在 yyyy/mm/dd 分区:hdfs://server123:8020/source/Avro/weblog/2019/06/03

有什么办法可以加快叶文件的列表速度,正如您可以从屏幕截图中看到的那样,将文件合并为 Parquet 只需 6 秒,而列出文件需要 1.3 分钟

最佳答案

由于读取大量小文件花费的时间太长,我退后一步,使用CombineFileInputFormat 创建了RDD。 This InputFormat 适用于小文件,因为它将许多文件打包成一个拆分,因此映射器较少,并且每个映射器都有更多的数据要处理。

这是我所做的:

def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {

val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
FileInputFormat.setInputPaths(job, filePaths: _*)
val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))

val rddKV = sc.sparkContext.newAPIHadoopRDD(
job.getConfiguration,
classOf[CombinedAvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])

val rowRDD = rddKV.mapPartitions(
f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
, preservesPartitioning = true)

val df = sc.sqlContext.createDataFrame(rowRDD ,
sqlType.dataType.asInstanceOf[StructType])
df

CombinedAvroKeyInputFormat 是用户定义的类,它扩展了 CombineFileInputFormat 并将 64MB 的数据放入单个拆分中。
object CombinedAvroKeyInputFormat {

class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
{
@throws[IOException]
@throws[InterruptedException]
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
this.inputSplit.getOffset(idx),
this.inputSplit.getLength(idx),
this.inputSplit.getLocations)
super.initialize(fileSplit, context)
}
}

}

/*
* The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
* We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
*/

class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
val logger = Logger.getLogger(AvroConsolidator.getClass)
setMaxSplitSize(67108864)
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
val inputSplit = split.asInstanceOf[CombineFileSplit]

/*
* CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
* When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
* and decides how many splits base on the MaxSplitSize
*/
return new CombineFileRecordReader[AvroKey[T], NullWritable](
inputSplit,
context,
c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
}
}

这使得读取小文件的速度快了很多

关于scala - Spark 集群 : Read Large number of small avro files is taking too long to list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56962341/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com