gpt4 book ai didi

java - 如何在单个 MapReduce 中读取多种类型的 Avro 数据

转载 作者:行者123 更新时间:2023-12-02 21:48:22 24 4
gpt4 key购买 nike

我有两种不同类型的 Avro 数据,它们有一些共同的字段。我想阅读映射器中的那些常见字段。我想通过在集群中生成单个作业来阅读此内容。

下面是示例 avro 架构

模式 1:

{"type":"record","name":"Test","namespace":"com.abc.schema.SchemaOne","doc":"Avro storing with schema using MR.","fields":[{"name":"EE","type":"string","default":null}, {"name":"AA","type":["null","long"],"default":null}, {"name":"BB","type":["null","string"],"default":null}, {"name":"CC","type":["null","string"],"default":null}]}



模式 2:

{"type":"record","name":"Test","namespace":"com.abc.schema.SchemaTwo","doc":"Avro storing with schema using MR.","fields":[{"name":"EE","type":"string","default":null}, {"name":"AA","type":["null","long"],"default":null}, {"name":"CC","type":["null","string"],"default":null}, {"name":"DD","type":["null","string"],"default":null}]}



司机等级:
package com.mango.schema.aggrDaily;

import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroDriver extends Configured implements Tool {

@Override
public int run(String[] args) throws Exception {

JobConf conf = new JobConf(super.getConf(), getClass());
conf.setJobName("DF");

args[0] = "hdfs://localhost:9999/home/hadoop/work/alok/aggrDaily/data/avro512MB/part-m-00000.avro";
args[1] = "/home/hadoop/work/alok/tmp"; // temp location
args[2] = "hdfs://localhost:9999/home/hadoop/work/alok/tmp/10";

FileInputFormat.addInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));

AvroJob.setInputReflect(conf);
AvroJob.setMapperClass(conf, AvroMapper.class);

AvroJob.setOutputSchema(
conf,
Pair.getPairSchema(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.INT)));

RunningJob job = JobClient.runJob(conf);

return 0;
}

public static void main(String[] args) throws Exception {
long startTime = new Date().getTime();
System.out.println("Start Time :::::" + startTime);
Configuration conf = new Configuration();
int exitCode = ToolRunner.run(conf, new AvroDriver(), args);
long endTime = new Date().getTime();
System.out.println("End Time :::::" + endTime);
System.out.println("Total Time Taken:::"
+ new Double((endTime - startTime) * 0.001) + "Sec.");
System.exit(exitCode);
}
}

映射器类:
package com.mango.schema.aggrDaily;

import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.mapred.Reporter;

public class AvroMapper extends
AvroMapper<GenericData, Pair<CharSequence, Integer>> {

@Override
public void map(GenericData record,
AvroCollector<Pair<CharSequence, Integer>> collector, Reporter reporter) throws IOException {
System.out.println("record :: " + record);
}

}

通过设置输入模式,我可以使用此代码读取 Avro 数据。
AvroJob.setInputSchema(conf, new AggrDaily().getSchema());
由于 Avro 数据已将架构内置到数据中,因此我不想将特定架构显式传递给作业。我在 Pig 中实现了这一点。但现在我也想在 MapReduce 中实现同样的目标。

任何人都可以通过 MR 代码帮助我实现这一目标,或者让我知道我哪里出错了吗?

最佳答案

通过 *org.apache.hadoop.mapreduce.lib.input。 多输入 类,我们可以通过单个 MR 作业读取多个 avro 数据

关于java - 如何在单个 MapReduce 中读取多种类型的 Avro 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23246968/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com