gpt4 book ai didi

scala - Spark : Writing to Avro file

转载 作者:行者123 更新时间:2023-12-03 14:20:38 31 4
gpt4 key购买 nike

我在 Spark 中,我有一个来自 Avro 文件的 RDD。我现在想对该 RDD 进行一些转换并将其保存为 Avro 文件:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[GenericRecord]],
job.getConfiguration)

运行此 Spark 时提示 Schema$recordSchema 不可序列化。

如果我取消注释 .map 调用(并且只有 rdd.saveAsNewAPIHadoopFile),则调用成功。

我在这里做错了什么?

任何的想法?

最佳答案

这里的问题与 Job 中使用的 avro.Schema 类的不可序列化有关。当您尝试从 map 函数内的代码中引用架构对象时,将引发异常。

例如,如果您尝试执行以下操作,您将收到“Task not serializable”异常:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})

您可以通过在功能 block 内创建模式的新实例来使一切正常工作:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})

由于您不希望为您处理的每条记录解析 avro 架构,因此更好的解决方案是在分区级别解析架构。以下也有效:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})

只要您提供对 jsonSchema 文件的可移植引用,上面的代码就可以工作,因为 map 函数将由多个远程执行程序执行。它可以是对 HDFS 中文件的引用,也可以与 JAR 中的应用程序一起打包(在后一种情况下,您将使用类加载器函数来获取其内容)。

对于那些尝试将 Avro 与 Spark 一起使用的人,请注意仍然存在一些 Unresolved 编译问题,您必须在 Maven POM 上使用以下导入:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>

注意 "hadoop2"分类器。您可以在 https://issues.apache.org/jira/browse/SPARK-3039 上跟踪问题。 .

关于scala - Spark : Writing to Avro file,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20612571/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com