gpt4 book ai didi

hadoop - 如何使用 Spark 编写 avro 文件?

转载 作者:可可西里 更新时间:2023-11-01 16:42:32 25 4
gpt4 key购买 nike

我有一个 Array[Byte] 代表一个 avro 模式。我正在尝试将它作为带有 spark 的 avro 文件写入 Hdfs。这是代码:

 val values = messages.map(row => (null,AvroUtils.decode(row._2,topic)))
.saveAsHadoopFile(
outputPath,
classOf[org.apache.hadoop.io.NullWritable],
classOf[CrashPacket],
classOf[AvroOutputFormat[SpecificRecordBase]]
)

row._2 是 Array[Byte]

我收到此错误:org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 98, bdac1nodec06.servizi.gr-u.it): java.lang.NullPointerException
at java.io.StringReader.<init>(StringReader.java:50)
at org.apache.avro.Schema$Parser.parse(Schema.java:958)
at org.apache.avro.Schema.parse(Schema.java:1010)
at org.apache.avro.mapred.AvroJob.getOutputSchema(AvroJob.java:143)
at org.apache.avro.mapred.AvroOutputFormat.getRecordWriter(AvroOutputFormat.java:153)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

最佳答案

考虑一下,有一个带有构造函数 StringPair(String a, String b) 的 avro 类 StringPair。然后将记录写入 avro 文件的代码可能如下所示:

import com.test.{StringPair}
import org.apache.avro.Schema
import org.apache.avro.mapred.{AvroValue, AvroKey}
import org.apache.avro.mapreduce.{AvroKeyValueOutputFormat, AvroJob}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.mapreduce.Job

object TestWriteAvro {
def main (args: Array[String]){
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val job = new Job(sc.hadoopConfiguration)

AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING))
AvroJob.setOutputValueSchema(job, StringPair.getClassSchema)
val myRdd = sc
.parallelize(List("1,2", "3,4"))
.map(x => (x.split(",")(0), x.split(",")(1)))
.map {case (x, y) => (new AvroKey[String](x), new AvroValue[StringPair](new StringPair(x, y)))}

myRdd.saveAsNewAPIHadoopFile(args(0), classOf[AvroKey[_]], classOf[AvroValue[_]], classOf[AvroKeyValueOutputFormat[_, _]], job.getConfiguration)

sc.stop()
}
}

关于hadoop - 如何使用 Spark 编写 avro 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39454591/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com