- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我知道在 HDFS 中读取大量小文件的问题一直是一个问题并被广泛讨论,但请耐心等待。大多数处理此类问题的 stackoverflow 问题都与读取大量 txt 文件有关。我正在尝试读取大量小型 avro 文件
此外,这些阅读 txt 文件解决方案谈到使用作为 RDD 实现的 WholeTextFileInputFormat 或 CombineInputFormat ( https://stackoverflow.com/a/43898733/11013878 ),我使用的是 Spark 2.4 (HDFS 3.0.0) 并且通常不鼓励使用 RDD 实现,并且首选数据帧。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。
我已经尝试按照 Murtaza 的建议合并数据帧,但是在大量文件中出现 OOM 错误( https://stackoverflow.com/a/32117661/11013878 )
我正在使用以下代码
val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String]
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")
df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*)
.mode(SaveMode.Append)
.option("compression","snappy")
.parquet(avroConsolidator.parquetFilePath.toString)
最佳答案
由于读取大量小文件花费的时间太长,我退后一步,使用CombineFileInputFormat 创建了RDD。 This InputFormat 适用于小文件,因为它将许多文件打包成一个拆分,因此映射器较少,并且每个映射器都有更多的数据要处理。
这是我所做的:
def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {
val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
FileInputFormat.setInputPaths(job, filePaths: _*)
val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))
val rddKV = sc.sparkContext.newAPIHadoopRDD(
job.getConfiguration,
classOf[CombinedAvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val rowRDD = rddKV.mapPartitions(
f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
, preservesPartitioning = true)
val df = sc.sqlContext.createDataFrame(rowRDD ,
sqlType.dataType.asInstanceOf[StructType])
df
object CombinedAvroKeyInputFormat {
class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
{
@throws[IOException]
@throws[InterruptedException]
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
this.inputSplit.getOffset(idx),
this.inputSplit.getLength(idx),
this.inputSplit.getLocations)
super.initialize(fileSplit, context)
}
}
}
/*
* The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
* We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
*/
class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
val logger = Logger.getLogger(AvroConsolidator.getClass)
setMaxSplitSize(67108864)
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
val inputSplit = split.asInstanceOf[CombineFileSplit]
/*
* CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
* When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
* and decides how many splits base on the MaxSplitSize
*/
return new CombineFileRecordReader[AvroKey[T], NullWritable](
inputSplit,
context,
c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
}
}
关于scala - Spark 集群 : Read Large number of small avro files is taking too long to list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56962341/
matplotlibrc 示例文件指出: ## The font.size property is the default font size for text, given in pts. ## 1
在 HTML/CSS 中,可以通过以下方式指定字体大小(已弃用,但所有浏览器都支持): text n 是 {1, 2, 3, 4, 5, 6, 7} 的一个元素。 另一种可能性是: text s 是
我看到很多地方都在谈论插入排序如何适用于小型数据集。不过,我找不到“小”的数字。我的猜测是没有绝对的答案,这取决于运行代码的机器类型。 但是,什么因素决定了插入排序是一个好主意的阈值是多少? “小”的
我为操作栏按钮尝试了两种不同的进度条样式,通过 refreshMenuItem.setActionView(R.layout.actionbar_indeterminate_progress); ac
这个问题可能需要一些编译器知识才能回答。我目前正在做一个项目,我将在其中创建一个数组,可能是 int[2][veryLargeNumber] 或 int [veryLargeNumber][2] 逻辑
请帮助我理解。我是一名技术专家,并试图确定在使用WhatsApp Cloud API执行分配给我的业务任务时是否会有任何限制。。在WhatsApp Business Account文档中,我看到了许多
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 6 年前。 Improve
我正在尝试对我的一个项目进行“小规模”优化。 有一系列单独很小的数组访问,但分析表明这些数组访问是我的程序的绝大多数时间花费的地方。所以,是时候让事情变得更快了,因为程序大约需要一个小时才能运行。 我
我正在为一个新网站整理我的排版,并且 出现了一个奇怪的现象标签 - 它把我的行高扔掉了。其他一切都很好(标题标签、段落等),但 small正在把它扔到某个地方。 这是我的意思的图像: 我的问题是这个
我的一个客户因为他的网站显示“太小”而大惊小怪。 This is the site in question对我来说它看起来不错。 他附上了一张截图,很明显他遇到这个问题只是因为他“缩小了”。 有没有办
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 12 年前。 Improve thi
我已经使用相同的值行高、垂直边距和填充设置了垂直对齐,但是当有一个较小的元素时,比如 标记,在流中,它破坏了一些像素的垂直节奏,我可以解决添加 vertical-align:top/bottom 但是
我试图创建输入,它在两侧具有 border-bottom 和小(高度)边框,如下所示: 但是这段代码不起作用: input:before, input:after { display: blo
我有以下 fiddle : http://jsfiddle.net/tompazourek/sn5jp/ some normal-sized text some small-sized text p
我使用 MASS::qda() 来查找我的数据的分类器,并且它总是报告 `some group is too small for 'qda' 这是由于我用于模型的测试数据的大小吗?我将测试样本大小从
我想重新组织我们的 Storyboard。我们有大约 25 个小 Storyboard和大约 5 个大 Storyboard。大的太大(在 xCode 中工作时会影响性能)。小的有时只包含一两个场景。
我的产品流程中有一个方面让我浪费了时间。 假设我在集合中有一个类似命名项目的列表,但它们的大小不同 id base_name sizing sum_dimensions
我正在制作一个解析 html 并从中获取图像的应用程序。使用 Beautiful Soup 可以轻松解析并下载 html,图像也可以使用 urllib2。 我确实在 urlparse 中遇到问题,无法
我创建了一个新的 Android 资源目录来支持不同的屏幕尺寸。我在 dimens.xml 中外包了所有硬编码尺寸和 TextSize。但是我的 values-small 目录没有被 Android
我只是出于好奇才问这个问题。 如果您使用 tag 3 次,tag 内的单词按预期变小 3 倍。这是否是一种可用的做法,为什么? Lorem Ipsum dolor sit amet 最佳答案 通常将
我是一名优秀的程序员,十分优秀!