gpt4 book ai didi

java - 将 1GB 数据加载到 hbase 需要 1 小时

转载 作者:可可西里 更新时间:2023-11-01 14:19:10 26 4
gpt4 key购买 nike

我想将 1GB(1000 万条记录)的 CSV 文件加载到 Hbase 中。我为此编写了 Map-Reduce 程序。我的代码运行良好,但需要 1 小时才能完成。 Last Reducer 花费了半个多小时的时间。谁能帮帮我?

我的代码如下:

驱动.Java

    package com.cloudera.examples.hbase.bulkimport;    import org.apache.hadoop.conf.Configuration;    import org.apache.hadoop.fs.Path;    import org.apache.hadoop.hbase.HBaseConfiguration;    import org.apache.hadoop.hbase.KeyValue;    import org.apache.hadoop.hbase.client.HTable;    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;    import org.apache.hadoop.mapreduce.Job;    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;    /**     * HBase bulk import example
* Data preparation MapReduce job driver *
    *
  1. args[0]: HDFS input path *
  2. args[1]: HDFS output path *
  3. args[2]: HBase table name *
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game 1 tip-off time (seconds from epoch) * Thu, 03 Jun 2010 18:00:00 PDT */ // conf.setInt("epoch.seconds.tipoff", 1275613200); conf.set("hbase.table.name", args[2]); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); Job job = new Job(conf, "HBase Bulk Import Example"); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf, args[2]); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // Load generated HFiles into table // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args[1]), hTable); } }

HColumnEnum.java

        package com.cloudera.examples.hbase.bulkimport;    /**     * HBase table columns for the 'srv' column family     */    public enum HColumnEnum {      SRV_COL_employeeid ("employeeid".getBytes()),      SRV_COL_eventdesc ("eventdesc".getBytes()),      SRV_COL_eventdate ("eventdate".getBytes()),      SRV_COL_objectname ("objectname".getBytes()),      SRV_COL_objectfolder ("objectfolder".getBytes()),      SRV_COL_ipaddress ("ipaddress".getBytes());      private final byte[] columnName;      HColumnEnum (byte[] column) {        this.columnName = column;      }      public byte[] getColumnName() {        return this.columnName;      }    }

HBaseKVMapper.java



<pre><code>package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import au.com.bytecode.opencsv.CSVParser;

/**
* HBase bulk import example
* <p>
* Parses Facebook and Twitter messages from CSV files and outputs
* <ImmutableBytesWritable, KeyValue>.
* <p>
* The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
* into the correct HBase table region.
* <p>
* The KeyValue value holds the HBase mutation information (column family,
* column, and value)
*/
public class HBaseKVMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

final static byte[] SRV_COL_FAM = "srv".getBytes();
final static int NUM_FIELDS = 6;

CSVParser csvParser = new CSVParser();
int tipOffSeconds = 0;
String tableName = "";

// DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
// .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));

ImmutableBytesWritable hKey = new ImmutableBytesWritable();
KeyValue kv;

/** {@inheritDoc} */
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration c = context.getConfiguration();

// tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
tableName = c.get("hbase.table.name");
}

/** {@inheritDoc} */
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

/*if (value.find("Service,Term,") > -1) {
// Skip header
return;
}*/

String[] fields = null;

try {
fields = value.toString().split(",");
//csvParser.parseLine(value.toString());
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
return;
}

if (fields.length != NUM_FIELDS) {
context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
return;
}

// Get game offset in seconds from tip-off
/* DateTime dt = null;

try {
dt = p.parseDateTime(fields[9]);
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
return;
}

int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
String offsetForKey = String.format("%04d", gameOffset);

String username = fields[2];
if (username.equals("")) {
username = fields[3];
}*/

// Key: e.g. "1200:twitter:jrkinley"
hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
.getBytes());

// Service columns
if (!fields[0].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
context.write(hKey, kv);
}

if (!fields[1].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
context.write(hKey, kv);
}

if (!fields[2].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
context.write(hKey, kv);
}

if (!fields[3].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
context.write(hKey, kv);
}

if (!fields[4].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
context.write(hKey, kv);
}

if (!fields[5].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
context.write(hKey, kv);
}


context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);

/*
* Output number of messages per quarter and before/after game. This should
* correspond to the number of messages per region in HBase
*/
/* if (gameOffset < 0) {
context.getCounter("QStats", "BEFORE_GAME").increment(1);
} else if (gameOffset < 900) {
context.getCounter("QStats", "Q1").increment(1);
} else if (gameOffset < 1800) {
context.getCounter("QStats", "Q2").increment(1);
} else if (gameOffset < 2700) {
context.getCounter("QStats", "Q3").increment(1);
} else if (gameOffset < 3600) {
context.getCounter("QStats", "Q4").increment(1);
} else {
context.getCounter("QStats", "AFTER_GAME").increment(1);
}*/
}
}
</code></pre>

Please help me to improve the performance or Please let me know if you have any alternate solution with sample code.

MY mapred-site.xml

 <?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>
<name>mapred.job.tracker</name>
<value>namenode:54311</value>
</property>

<property>
<name>mapred.reduce.parallel.copies</name>
<value>20</value>
</property>

<property>
<name>tasktracker.http.threads</name>
<value>50</value>
</property>

<property>
<name>mapred.job.shuffle.input.buffer.percent</name>
<value>0.70</value>
</property>

<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>4</value>
</property>

<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
<value>4</value>
</property>

<property>
<name>mapred.map.tasks</name>
<value>4</value>
</property>

<property>
<name>reduce.map.tasks</name>
<value>4</value>
</property>

<property>
<name>mapred.job.shuffle.merge.percent</name>
<value>0.65</value>
</property>

<property>
<name>mapred.task.timeout</name>
<value>1200000</value>
</property>

<property>
<name>mapred.child.java.opts</name>
<value>-Xms1024M -Xmx2048M</value>
</property>



<property>
<name>mapred.job.reuse.jvm.num.tasks</name>
<value>-1</value>
</property>

<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>

<property>
<name>mapred.map.output.compression.codec</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<property>
<name>io.sort.mb</name>
<value>800</value>
</property>


<property>
<name>mapred.child.ulimit</name>
<value>unlimited</value>
</property>

<property>
<name>io.sort.factor</name>
<value>100</value>
<description>More streams merged at once while sorting files.</description>
</property>


<property>
<name>mapreduce.admin.map.child.java.opts</name>
<value>-Djava.net.preferIPv4Stack=true</value>
</property>
<property>
<name>mapreduce.admin.reduce.child.java.opts</name>
<value>-Djava.net.preferIPv4Stack=true</value>
</property>


<property>
<name>mapred.min.split.size</name>
<value>0</value>
</property>

<property>
<name>mapred.job.map.memory.mb</name>
<value>-1</value>
</property>

<property>
<name>mapred.jobtracker.maxtasks.per.job</name>
<value>-1</value>
</property>


</configuration>

hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://namenode:54310/hbase</value>
<description>The directory shared by RegionServers.
</description>
</property>

<property>
<name>hbase.master</name>
<value>slave:60000</value>
<description>The host and port that the HBase master runs at.
A value of 'local' runs the master and a regionserver
in a single process.
</description>
</property>

<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
<description>The mode the cluster will be in. Possible values are
false: standalone and pseudo-distributed setups with managed Zookeeper
true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)
</description>
</property>

<property>
<name>hbase.zookeeper.quorum</name>
<value>slave</value>
<description>Comma separated list of servers in the ZooKeeper Quorum.
For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
By default this is set to localhost for local and pseudo-distributed modes
of operation. For a fully-distributed setup, this should be set to a full
list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
this is the list of servers which we will start/stop ZooKeeper on.
</description>
</property>

<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>

<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hduser/work/zoo_data</value>
<description>Property from ZooKeeper's config zoo.cfg.
The directory where the snapshot is stored.
</description>
</property>

</configuration>

请帮助我,这样我就可以提高我的表现。

最佳答案

首先,为什么我们需要 Mapreduce 程序来将这么小的文件 (1GB) 的数据加载到 Hbase。

根据我的经验,我使用 Jackson 流处理了 5GB Json(我不想将所有 json 都存入内存)并在 8 分钟内通过批处理技术持久化在 Hbase 中。

我使用 hbase 批量放入 100000 条记录的列表对象。

下面是我实现此目的的代码片段。同样的事情也可以在解析其他格式时完成)

可能你需要在两个地方调用这个方法

1) 批量为 100000 条记录。

2)对于你的批记录少于100000条的处理提醒

  public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
try {
final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
table.put(puts);
LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
} catch (final Throwable e) {
e.printStackTrace();
} finally {
LOG.info("Processed ---> " + puts.size());
if (puts != null) {
puts.clear();
}
}
}

关于java - 将 1GB 数据加载到 hbase 需要 1 小时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23421818/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com