gpt4 book ai didi

java - 带Avro的Mapreduce-通用解析

转载 作者:行者123 更新时间:2023-12-02 21:21:59 26 4
gpt4 key购买 nike

问题陈述:

  • hdfs中可用avro格式的数据。
  • 上面的avro数据的
  • 模式也可用。
  • 需要在map reduce中解析此Avro数据,并以相同的架构生成输出avro数据(需要清除传入的Avro数据)。
  • 输入的avro数据可以是任何模式。

  • 因此,要求编写一种通用映射表,该映射表可以获取任何Avro数据,但以Avro格式生成输出,并具有与传入相同的模式。

    代码(经过多次尝试,这是我到达的距离)

    驱动程序
    public class AvroDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
    Job job = new Job(getConf());
    job.setJarByClass(AvroMapper.class);
    job.setJobName("Avro With Xml Mapper");
    job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);

    //This is required to use avro-1.7.6 and above
    job.getConfiguration().set("mapreduce.job.user.classpath.first", "true");
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setMapperClass(AvroMapper.class);
    Schema schema = new Schema.Parser().parse(new File(args[2]));
    AvroJob.setInputKeySchema(job, schema);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    job.setMapOutputKeyClass(AvroKey.class);
    AvroJob.setOutputKeySchema(job, schema);
    job.setNumReduceTasks(0);
    return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new AvroDriver(), args);
    System.exit(res);
    }
    }

    映射器
    public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {

    @Override
    public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

    try {
    System.out.println("Specific Record - " + key);
    System.out.println("Datum :: " + key.datum());
    System.out.println("Schema :: " + key.datum().getSchema());
    List<Field> fields = key.datum().getSchema().getFields();


    GenericRecord record = new GenericData.Record(key.datum().getSchema());
    for(Field f : fields) {
    System.out.println("Field Name - " + f.name());
    record.put(f.name(), key.datum().get(f.name()));
    }
    System.out.println("Record - " + record);
    GenericData d = new GenericData();
    d.newRecord(record, key.datum().getSchema());
    AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);

    System.out.println("Generic Record (Avro Key) - " + outkey);
    context.write(outkey, NullWritable.get());
    } catch (Exception e) {
    e.printStackTrace();
    throw new IOException(e.getMessage());
    }
    }
    }

    命令

    hadoop jar $jar_name $input_avro_data_path $output_path $path_to_the_input_avro_schema



    Avro模式样本
    { "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields".......

    我在运行 map 时遇到的问题减少了

    Error running child : java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity

    org.apache.avro.file.DataFileWriter$AppendWriteException: java.lang.NullPointerException: in com.sample.avro.Entity null of com.sample.avro.Entity



    环境

    HDP 2.3沙箱

    有什么想法吗?

    更新了

    我尝试了以下但相同的结果
    public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> {

    @Override
    public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

    try {
    System.out.println("Specific Record - " + key);
    System.out.println("Datum :: " + key.datum());
    System.out.println("Schema :: " + key.datum().getSchema());
    List<Field> fields = key.datum().getSchema().getFields();

    Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
    List<Field> outFields = new ArrayList<Field>();
    for(Field f : fields) {
    System.out.println("Field Name - " + f.name());
    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
    outFields.add(f1);
    }
    s.setFields(outFields);

    System.out.println("Out Schema - " + s);
    GenericRecord record = new GenericData.Record(s);
    for(Field f : fields) {
    record.put(f.name(), key.datum().get(f.name()));
    }
    System.out.println("Record - " + record);
    GenericData d = new GenericData();
    d.newRecord(record, s);
    AvroKey<GenericData> outkey = new AvroKey<GenericData>(d);
    System.out.println("Generic Record (Avro Key) - " + outkey.datum());
    context.write(outkey, NullWritable.get());
    } catch (Exception e) {
    e.printStackTrace();
    }

    }
    }

    请注意,将 map 输入的Avro效果很好,但是此处出现的是Avro格式的输出。

    最佳答案

    最后,我找到了答案和映射器代码,如下所示。
    我没有使用GenericData发出AvroKey,而是更改为发出GenericData.Record。

    public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> {

    @Override
    public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException {

    try {
    System.out.println("Specific Record - " + key);
    System.out.println("Datum :: " + key.datum());
    System.out.println("Schema :: " + key.datum().getSchema());
    List<Field> fields = key.datum().getSchema().getFields();

    Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false);
    List<Field> outFields = new ArrayList<Field>();
    for(Field f : fields) {
    System.out.println("Field Name - " + f.name());
    Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null);
    outFields.add(f1);
    }
    s.setFields(outFields);

    System.out.println("Out Schema - " + s);
    GenericData.Record record = new GenericData.Record(s);
    for(Field f : fields) {
    record.put(f.name(), key.datum().get(f.name()));
    }
    System.out.println("Record - " + record);
    AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record);
    System.out.println("Generic Record (Avro Key) - " + outkey.datum());
    context.write(outkey, NullWritable.get());
    } catch (Exception e) {
    e.printStackTrace();
    System.out.println(e);
    System.out.println(e.getMessage());
    throw new IOException(e.getMessage());
    }
    }
    }

    关于java - 带Avro的Mapreduce-通用解析,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37013531/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com