- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我知道在 HDFS 中读取大量小文件的问题一直是一个问题并被广泛讨论,但请耐心等待。大多数处理此类问题的 stackoverflow 问题都与读取大量 txt 文件有关。我正在尝试读取大量小型 avro 文件
此外,这些阅读 txt 文件解决方案谈到使用作为 RDD 实现的 WholeTextFileInputFormat 或 CombineInputFormat ( https://stackoverflow.com/a/43898733/11013878 ),我使用的是 Spark 2.4 (HDFS 3.0.0) 并且通常不鼓励使用 RDD 实现,并且首选数据帧。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。
我已经尝试按照 Murtaza 的建议合并数据帧,但是在大量文件中出现 OOM 错误( https://stackoverflow.com/a/32117661/11013878 )
我正在使用以下代码
val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String]
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")
df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*)
.mode(SaveMode.Append)
.option("compression","snappy")
.parquet(avroConsolidator.parquetFilePath.toString)
最佳答案
由于读取大量小文件花费的时间太长,我退后一步,使用CombineFileInputFormat 创建了RDD。 This InputFormat 适用于小文件,因为它将许多文件打包成一个拆分,因此映射器较少,并且每个映射器都有更多的数据要处理。
这是我所做的:
def createDataFrame(filePaths: Array[Path], sc: SparkSession, inputs: AvroConsolidatorInputs): DataFrame = {
val job: Job = Job.getInstance(sc.sparkContext.hadoopConfiguration)
FileInputFormat.setInputPaths(job, filePaths: _*)
val sqlType = SchemaConverters.toSqlType(getSchema(inputs.logSubType))
val rddKV = sc.sparkContext.newAPIHadoopRDD(
job.getConfiguration,
classOf[CombinedAvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable])
val rowRDD = rddKV.mapPartitions(
f = (iter: Iterator[(AvroKey[GenericRecord], NullWritable)]) =>
iter.map(_._1.datum()).map(genericRecordToRow(_, sqlType))
, preservesPartitioning = true)
val df = sc.sqlContext.createDataFrame(rowRDD ,
sqlType.dataType.asInstanceOf[StructType])
df
object CombinedAvroKeyInputFormat {
class CombinedAvroKeyRecordReader[T](var inputSplit: CombineFileSplit, context: TaskAttemptContext, idx: Integer)
extends AvroKeyRecordReader[T](AvroJob.getInputKeySchema(context.getConfiguration))
{
@throws[IOException]
@throws[InterruptedException]
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
this.inputSplit = inputSplit.asInstanceOf[CombineFileSplit]
val fileSplit = new FileSplit(this.inputSplit.getPath(idx),
this.inputSplit.getOffset(idx),
this.inputSplit.getLength(idx),
this.inputSplit.getLocations)
super.initialize(fileSplit, context)
}
}
}
/*
* The class CombineFileInputFormat is an abstract class with no implementation, so we must create a subclass to support it;
* We’ll name the subclass CombinedAvroKeyInputFormat. The subclass will initiate a delegate CombinedAvroKeyRecordReader that extends AvroKeyRecordReader
*/
class CombinedAvroKeyInputFormat[T] extends CombineFileInputFormat[AvroKey[T], NullWritable] {
val logger = Logger.getLogger(AvroConsolidator.getClass)
setMaxSplitSize(67108864)
def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[AvroKey[T], NullWritable] = {
val c = classOf[CombinedAvroKeyInputFormat.CombinedAvroKeyRecordReader[_]]
val inputSplit = split.asInstanceOf[CombineFileSplit]
/*
* CombineFileRecordReader is a built in class that pass each split to our class CombinedAvroKeyRecordReader
* When the hadoop job starts, CombineFileRecordReader reads all the file sizes in HDFS that we want it to process,
* and decides how many splits base on the MaxSplitSize
*/
return new CombineFileRecordReader[AvroKey[T], NullWritable](
inputSplit,
context,
c.asInstanceOf[Class[_ <: RecordReader[AvroKey[T], NullWritable]]])
}
}
关于scala - Spark 集群 : Read Large number of small avro files is taking too long to list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56962341/
从 angular 5.1 更新到 6.1 后,我开始从我的代码中收到一些错误,如下所示: Error: ngc compilation failed: components/forms/utils.
我正在学习 Typescript 并尝试了解类型和接口(interface)的最佳实践。我正在玩一个使用 GPS 坐标的示例,想知道一种方法是否比另一种更好。 let gps1 : number[];
type padding = [number, number, number, number] interface IPaddingProps { defaultValue?: padding
这两种格式在内存中保存结果的顺序上有什么区别吗? number = number + 10; number += 10; 我记得一种格式会立即保存结果,因此下一行代码可以使用新值,而对于另一种格式,
在 Python 匹配模式中,如何匹配像 1 这样的文字数字在按数字反向引用后 \1 ? 我尝试了 \g用于此目的的替换模式中可用的语法,但它在我的匹配模式中不起作用。 我有一个更大的问题,我想使用一
我的源文件here包含 HTML 代码,我想将电话号码更改为可在我的应用程序中单击。我正在寻找一个正则表达式来转换字符串 >numbernumber(\d+)$1numbernumber<",我们在S
我们有一个包含 2 个字段和一个按钮的表单。我们想要点击按钮来输出位于 int A 和 int B 之间的随机整数(比如 3、5 或 33)? (不需要使用 jQuery 或类似的东西) 最佳答案 你
我收到以下类型错误(TypeScript - 3.7.5)。 error TS2345: Argument of type '(priority1: number, priority2: number
只想创建简单的填充器以在其他功能中使用它: function fillLine(row, column, length, bgcolor) { var sheet = SpreadsheetApp
我有一个问题。当我保存程序输出的 *.txt 时,我得到以下信息:0.021111111111111112a118d0 以及更多的东西。 问题是: 这个数字中的“d0”和“a”是什么意思? 我不知道“
首先:抱歉标题太长了,但我发现很难用一句话来解释这个问题;)。是的,我也四处搜索(这里和谷歌),但找不到合适的答案。 所以,问题是这样的: 数字 1-15 将像这样放在金字塔中(由数组表示):
我想从字符串中提取血压。数据可能如下所示: text <- c("at 10.00 seated 132/69", "99/49", "176/109", "10.12 I 128/51, II 1
当尝试执行一个简单的 bash 脚本以将前面带有 0 的数字递增 1 时,原始数字被错误地解释。 #!/bin/bash number=0026 echo $number echo $((number
我有一个类型为 [number, number] 的字段,TypeScript 编译器(strict 设置为 true)出现问题,提示初始值值(value)。我尝试了以下方法: public shee
你能帮我表达数组吗:["232","2323","233"] 我试试这个:/^\[("\d{1,7}")|(,"\d{1,7}")\]$/ 但是这个表达式不能正常工作。 我使用 ruby(rail
这个问题在这里已经有了答案: meaning of (number) & (-number) (4 个回答) 关闭6年前. 例如: int get(int i) { int res = 0;
我正在考虑使用 Berkeley DB作为高度并发的移动应用程序后端的一部分。对于我的应用程序,使用 Queue对于他们的记录级别锁定将是理想的。但是,如标题中所述,我需要查询和更新概念建模的数据,如
我正在尝试解决涉及重复数字的特定 JavaScript 练习,为此我需要将重复数字处理到大量小数位。 目前我正在使用: function divide(numerator, denominator){
我有这个数组类型: interface Details { Name: string; URL: string; Year: number; } interface AppState {
我们正在使用 Spring 3.x.x 和 Quartz 2.x.x 实现 Web 应用程序。 Web 服务器是 Tomcat 7.x.x。我们有 3 台服务器。 Quartz 是集群式的,因此所有这
我是一名优秀的程序员,十分优秀!