gpt4 book ai didi

java - 以编程方式将数据批量加载到 HBase 的最快方法是什么?

转载 作者:太空狗 更新时间:2023-10-29 22:40:30 24 4
gpt4 key购买 nike

我有一个可能包含数百万行需要自定义解析的纯文本文件,我想尽快将它加载到 HBase 表中(使用 Hadoop 或 HBase Java 客户端)。

我当前的解决方案基于没有 Reduce 部分的 MapReduce 作业。我使用 FileInputFormat 读取文本文件,以便将每一行传递到我的 Mapper 类的 map 方法。此时,该行被解析为一个 Put 对象,该对象被写入 context。然后,TableOutputFormat 获取 Put 对象并将其插入到表中。

此解决方案产生的平均插入率为每秒 1,000 行,低于我的预期。 我的 HBase 设置在单个服务器上处于伪分布式模式。

一件有趣的事情是,在插入 1,000,000 行期间,生成了 25 个 Mappers(任务),但它们连续运行(一个接一个);这正常吗?

这是我当前解决方案的代码:

public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());

Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}

try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}

conf.set("hbase.mapred.outputtable", args[1]);

// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");

Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);

job.waitForCompletion(true);
return 0;
}

public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);

int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);

Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));

System.exit(errCode);
}

最佳答案

我经历了一个可能与您试图找到一种有效方法将数据从 MR 加载到 HBase 的过程非常相似的过程。我发现可以使用 HFileOutputFormat 作为 MR 的 OutputFormatClass。

下面是我的代码的基础,我必须生成 job 和写出数据的 Mapper map 函数。这很快。我们不再使用它了,所以我手头没有数字,但它在不到一分钟内就有大约 250 万条记录。

这是我编写的(精简的)函数,用于为我的 MapReduce 进程生成将数据放入 HBase 的作业

private Job createCubeJob(...) {
//Build and Configure Job
Job job = new Job(conf);
job.setJobName(jobName);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper
job.setJarByClass(CubeBuilderDriver.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(HFileOutputFormat.class);

TextInputFormat.setInputPaths(job, hiveOutputDir);
HFileOutputFormat.setOutputPath(job, cubeOutputPath);

Configuration hConf = HBaseConfiguration.create(conf);
hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort);

HTable hTable = new HTable(hConf, tableName);

HFileOutputFormat.configureIncrementalLoad(job, hTable);
return job;
}

这是我在 HiveToHBaseMapper 类中的 map 函数(略有编辑)。

public void map(WritableComparable key, Writable val, Context context)
throws IOException, InterruptedException {
try{
Configuration config = context.getConfiguration();
String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR);
String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY);
String column = strs[COLUMN_INDEX];
String Value = strs[VALUE_INDEX];
String sKey = generateKey(strs, config);
byte[] bKey = Bytes.toBytes(sKey);
Put put = new Put(bKey);
put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0)
? Bytes.toBytes(Double.MIN_VALUE)
: Bytes.toBytes(value));

ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey);
context.write(ibKey, put);

context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1);
}
catch(Exception e){
context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1);
}

}

我很确定这不会成为您的复制和粘贴解决方案。显然,我在这里使用的数据不需要任何自定义处理(这是在此之前的 MR 作业中完成的)。我想提供的主要内容是 HFileOutputFormat。其余的只是我如何使用它的一个例子。 :)
我希望它能让你走上一条通往好的解决方案的坚实道路。 :

关于java - 以编程方式将数据批量加载到 HBase 的最快方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8750764/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com