gpt4 book ai didi

hbase - 如何使用spark从hbase读取

转载 作者:行者123 更新时间:2023-12-05 01:35:57 24 4
gpt4 key购买 nike

下面的代码将从 hbase 读取,然后将其转换为 json 结构并转换为 schemaRDD ,但问题是我是 using List存储 json 字符串然后传递给 javaRDD,对于大约 100 GB 的数据,master 将在内存中加载数据。从 hbase 加载数据然后执行操作,然后转换为 JavaRDD 的正确方法是什么。

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

public static void main(String[] args) throws IOException, ParseException {

List<String> jars = Lists.newArrayList("");

SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");

JavaSQLContext jsql = new JavaSQLContext(sc);


HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {

ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();

String json = null;

for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);

String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "\""+s2_str+"\":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
jsonList.add(json);
}

JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




System.out.println(schemaRDD.take(2));

} finally {
table.close();
}

}

}

最佳答案

使用 Spark (Scala) 读取 HBase 数据的基本示例,您也可以在 Java 中编写:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "table1"

System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)

val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found : " + hBaseRDD.count())
sc.stop()
}
}

-2016 年更新

从 Spark 1.0.x+ 开始,现在您也可以使用 Spark-HBase 连接器:

要包含的 Maven 依赖项:
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

并找到以下相同的示例代码:
import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
val sc = new SparkContext(sparkConf)

// For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
.select("DocID", "Title").inColumnFamily("SMPL")

println("Number of Records found : " + docRdd .count())
}

更新 - 2017

从 Spark 1.6.x+ 开始,现在您也可以使用 SHC 连接器(Hortonworks 或 HDP 用户):

要包含的 Maven 依赖项:
    <dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc</artifactId>
<version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
</dependency>

使用此连接器的主要优点是它在架构定义中具有灵活性,并且不需要像 nerdammer/spark-hbase-connector 那样的硬编码参数。还请记住,它支持 Spark 2.x,因此此连接器非常灵活,并在问题和 PR 中提供端到端支持。

找到最新自述文件和示例的以下存储库路径:

Hortonworks Spark HBase Connector

您还可以将此 RDD 转换为 DataFrame 并在其上运行 SQL,或者您可以将这些 Dataset 或 DataFrame 映射到用户定义的 Java Pojo 或 Case 类。它工作得很好。

如果您需要其他任何内容,请在下面发表评论。

关于hbase - 如何使用spark从hbase读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25040709/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com