gpt4 book ai didi

java - Hadoop - 直接从 Mapper 写入 HBase

转载 作者:可可西里 更新时间:2023-11-01 14:13:48 25 4
gpt4 key购买 nike

我有一个 haddop 作业,它的输出应该写入 HBase。我真的不需要 reducer,我想插入的行类型在 Mapper 中确定。

如何使用 TableOutputFormat 来实现这一点?从我看到的所有示例中,假设是 reducer 是创建 Put 的那个,而 TableMapper 只是用于从 HBase 表中读取。

在我的例子中,输入是 HDFS,输出是 Put to specific table,我在 TableMapReduceUtil 中也找不到任何可以帮助我的东西。

是否有任何示例可以帮助我解决这个问题?

顺便说一句,我正在使用新的 Hadoop API

最佳答案

这是从文件中读取并将所有行放入 Hbase 的示例。此示例来自“Hbase:权威指南”,您可以在存储库中找到它。要获取它,只需在您的计算机上克隆 repo:

git clone git://github.com/larsgeorge/hbase-book.git

在本书中,您还可以找到有关代码的所有解释。但如果您有什么不明白的地方,请随时提问。

`    public class ImportFromFile {
public static final String NAME = "ImportFromFile";
public enum Counters { LINES }

static class ImportMapper
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
private byte[] family = null;
private byte[] qualifier = null;

@Override
protected void setup(Context context)
throws IOException, InterruptedException {
String column = context.getConfiguration().get("conf.column");
byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
family = colkey[0];
if (colkey.length > 1) {
qualifier = colkey[1];
}
}

@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
byte[] rowkey = DigestUtils.md5(lineString);
Put put = new Put(rowkey);
put.add(family, qualifier, Bytes.toBytes(lineString));
context.write(new ImmutableBytesWritable(rowkey), put);
context.getCounter(Counters.LINES).increment(1);
} catch (Exception e) {
e.printStackTrace();
}
}
}

private static CommandLine parseArgs(String[] args) throws ParseException {
Options options = new Options();
Option o = new Option("t", "table", true,
"table to import into (must exist)");
o.setArgName("table-name");
o.setRequired(true);
options.addOption(o);
o = new Option("c", "column", true,
"column to store row data into (must exist)");
o.setArgName("family:qualifier");
o.setRequired(true);
options.addOption(o);
o = new Option("i", "input", true,
"the directory or file to read from");
o.setArgName("path-in-HDFS");
o.setRequired(true);
options.addOption(o);
options.addOption("d", "debug", false, "switch on DEBUG log level");
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(options, args);
} catch (Exception e) {
System.err.println("ERROR: " + e.getMessage() + "\n");
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NAME + " ", options, true);
System.exit(-1);
}
return cmd;
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs =
new GenericOptionsParser(conf, args).getRemainingArgs();
CommandLine cmd = parseArgs(otherArgs);
String table = cmd.getOptionValue("t");
String input = cmd.getOptionValue("i");
String column = cmd.getOptionValue("c");
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table " + table);

job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}`

关于java - Hadoop - 直接从 Mapper 写入 HBase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11061854/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com