gpt4 book ai didi

hadoop - Hbase mapside join-其中一张表没有被读取?从 hbase 中读取正确的结果到 hbase

转载 作者:可可西里 更新时间:2023-11-01 16:43:05 29 4
gpt4 key购买 nike

我正在尝试对位于 Hbase 中的两个表进行映射连接。我的目的是在hashmap中保留小表的记录并与大表进行比较,一旦匹配,再次将记录写入hbase中的表中。我使用 Mapper 和 Reducer 编写了类似的连接操作代码,它运行良好,两个表都在映射器类中被扫描。但是由于 reduce side join 根本没有效率,我只想在 mapper 端加入表。在下面的代码中,“commented if block”只是为了看到它总是返回 false 并且第一个表(小表)没有被读取。任何提示帮助表示赞赏。我正在使用 HDP 的沙箱。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
//import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.Tool;
import com.sun.tools.javac.util.Log;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;

public class JoinDriver extends Configured implements Tool {

static int row_index = 0;

public static class JoinJobMapper extends TableMapper<ImmutableBytesWritable, Put> {
private static byte[] big_table_bytarr = Bytes.toBytes("big_table");
private static byte[] small_table_bytarr = Bytes.toBytes("small_table");

HashMap<String,String> myHashMap = new HashMap<String, String>();

byte[] c1_value;
byte[] c2_value;

String big_table;
String small_table;

String big_table_c1;
String big_table_c2;

String small_table_c1;
String small_table_c2;

Text mapperKeyS;
Text mapperValueS;
Text mapperKeyB;
Text mapperValueB;

public void map(ImmutableBytesWritable rowKey, Result columns, Context context) {
TableSplit currentSplit = (TableSplit) context.getInputSplit();
byte[] tableName = currentSplit.getTableName();

try {
Put put = new Put(Bytes.toBytes(++row_index));


// put small table into hashmap - myhashMap
if (Arrays.equals(tableName, small_table_bytarr)) {

c1_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c1"));
c2_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c2"));
small_table_c1 = new String(c1_value);
small_table_c2 = new String(c2_value);

mapperKeyS = new Text(small_table_c1);
mapperValueS = new Text(small_table_c2);

myHashMap.put(small_table_c1,small_table_c2);


} else if (Arrays.equals(tableName, big_table_bytarr)) {
c1_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c1"));
c2_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c2"));
big_table_c1 = new String(c1_value);
big_table_c2 = new String(c2_value);

mapperKeyB = new Text(big_table_c1);
mapperValueB = new Text(big_table_c2);



// if (set.containsKey(big_table_c1)){

put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c1"), Bytes.toBytes(big_table_c1));
context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );
put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c2"), Bytes.toBytes(big_table_c2));
context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );
put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c3"),Bytes.toBytes((myHashMap.get(big_table_c1))));
context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );

// }

}

} catch (Exception e) {
// TODO : exception handling logic
e.printStackTrace();
}
}

}

public int run(String[] args) throws Exception {

List<Scan> scans = new ArrayList<Scan>();



Scan scan1 = new Scan();
scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("small_table"));
System.out.println(scan1.getAttribute("scan.attributes.table.name"));
scans.add(scan1);

Scan scan2 = new Scan();
scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("big_table"));
System.out.println(scan2.getAttribute("scan.attributes.table.name"));
scans.add(scan2);

Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJar("MSJJ.jar");
job.setJarByClass(JoinDriver.class);

TableMapReduceUtil.initTableMapperJob(scans, JoinJobMapper.class, ImmutableBytesWritable.class, Put.class, job);
TableMapReduceUtil.initTableReducerJob("joined_table", null, job);
job.setNumReduceTasks(0);


job.waitForCompletion(true);

return 0;
}

public static void main(String[] args) throws Exception {
JoinDriver runJob = new JoinDriver();
runJob.run(args);

}

}

最佳答案

通过阅读您的问题陈述,我相信您对多 HBase 表输入的使用有一些错误的想法。我建议您在映射器类的设置方法中将小表加载到 HashMap 中。然后在大表上使用 map only 作业,在 map 方法中,您可以从之前加载的 HashMap 中获取相应的值。让我知道结果如何。

关于hadoop - Hbase mapside join-其中一张表没有被读取?从 hbase 中读取正确的结果到 hbase,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38987924/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com