- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
下面的代码将从 hbase 读取,然后将其转换为 json 结构并转换为 schemaRDD ,但问题是我是 using List
存储 json 字符串然后传递给 javaRDD,对于大约 100 GB 的数据,master 将在内存中加载数据。从 hbase 加载数据然后执行操作,然后转换为 JavaRDD 的正确方法是什么。
package hbase_reader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import com.google.common.collect.Lists;
public class hbase_reader {
public static void main(String[] args) throws IOException, ParseException {
List<String> jars = Lists.newArrayList("");
SparkConf spconf = new SparkConf();
spconf.setMaster("local[2]");
spconf.setAppName("HBase");
//spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
spconf.setJars(jars.toArray(new String[jars.size()]));
JavaSparkContext sc = new JavaSparkContext(spconf);
//spconf.set("spark.executor.memory", "1g");
JavaSQLContext jsql = new JavaSQLContext(sc);
HBaseConfiguration conf = new HBaseConfiguration();
String tableName = "HBase.CounData1_Raw_Min1";
HTable table = new HTable(conf,tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
List<String> jsonList = new ArrayList<String>();
String json = null;
for(Result rowResult:scanner) {
json = "";
String rowKey = Bytes.toString(rowResult.getRow());
for(byte[] s1:rowResult.getMap().keySet()) {
String s1_str = Bytes.toString(s1);
String jsonSame = "";
for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
String s2_str = Bytes.toString(s2);
for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
jsonSame += "\""+s2_str+"\":"+s3_str+",";
}
}
jsonSame = jsonSame.substring(0,jsonSame.length()-1);
json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
}
json = json.substring(0,json.length()-1);
json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
jsonList.add(json);
}
JavaRDD<String> jsonRDD = sc.parallelize(jsonList);
JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);
System.out.println(schemaRDD.take(2));
} finally {
table.close();
}
}
}
最佳答案
使用 Spark (Scala) 读取 HBase 数据的基本示例,您也可以在 Java 中编写:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
object HBaseRead {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
val tableName = "table1"
System.setProperty("user.name", "hdfs")
System.setProperty("HADOOP_USER_NAME", "hdfs")
conf.set("hbase.master", "localhost:60000")
conf.setInt("timeout", 120000)
conf.set("hbase.zookeeper.quorum", "localhost")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(tableName)) {
val tableDesc = new HTableDescriptor(tableName)
admin.createTable(tableDesc)
}
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
println("Number of Records found : " + hBaseRDD.count())
sc.stop()
}
}
<dependency>
<groupId>it.nerdammer.bigdata</groupId>
<artifactId>spark-hbase-connector_2.10</artifactId>
<version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>
import org.apache.spark._
import it.nerdammer.spark.hbase._
object HBaseRead extends App {
val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
val sc = new SparkContext(sparkConf)
// For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:
val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
.select("DocID", "Title").inColumnFamily("SMPL")
println("Number of Records found : " + docRdd .count())
}
<dependency>
<groupId>com.hortonworks</groupId>
<artifactId>shc</artifactId>
<version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
</dependency>
关于hbase - 如何使用spark从hbase读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25040709/
发出时Delete对于 hbase,我知道它不会立即删除数据。但是什么时候删除数据,我的意思是,物理上? 最佳答案 当您向 HBase 写入内容时,它会存储在内存存储 (RAM) 中,然后再写入磁盘。
同一行的列族属于同一个 RegionServer。 那么,这里的问题是一个 RegionServer 会在不同的机器上存储不同的列族吗? 最佳答案 不一定,但在某些时候它会。这是基本 HBase 架构
如果我想插入表格: row | fam:qualifier | timestamp | value 1 | foo:bar | 12345 | 2 1 | foo:bar | 12346 | 3 1
有时我想退出我在 HBase shell 中运行的命令,例如扫描操作通常需要太多时间。 所以我想停止运行这个命令,但我不想退出 HBase shell。 我常用的停止运行命令的方式,我使用了Ctrl+
有没有办法设置 Hbase 以便我们可以在同一个集群中创建多个数据库? 最佳答案 只是为了刷新主题:http://hbase.apache.org/book.html#namespace 5.3.1.
怎么看version的 hbase我在用? 你能下命令吗? 最佳答案 hbase version命令行界面中的命令给出了 version的 hbase正在使用中。 以下是来自 Cloudera 的两个
高级问题: HBase 是否对所有分布(因此不是实现的工件)通用的每行施加了最大大小,无论是在 方面吗?字节存储 或在 方面细胞数 ? 如果是这样: 限制是什么? 极限存在的原因是什么? 限制在哪里记
假设,假设我在数据仓库设置中有一个星型模式。 有一个非常非常长的事实表(想想几十亿到几万亿行)和几个低基数维度表(想想 100 个维度表)。每个事实表外键 指向一个维度表的主键是位图索引的。每个维度表
版本:Hadoop: 2.0.0-cdh4.3.1 HBase: 0.94.6-cdh4.3.1 我正在运行 cloudera quick start vm,这是我的小型远程 HBase Java 客
我正在尝试以完全分布式模式配置 HBase。 (使用 Ubuntu 12.04,Apache Hadoop 2.2(以伪模式运行,HBase 版本 0.98) 以下是我的 bashrc 设置: exp
我想知道如何正确配置 hbase.zookeeper.quorum 以将 zookeeper 实例指向集群模式。 最佳答案 hbase.zookeeper.quorum 属性是运行 ZooKeeper
我想知道如何正确配置 hbase.zookeeper.quorum 以将 zookeeper 实例指向集群模式。 最佳答案 hbase.zookeeper.quorum 属性是运行 ZooKeeper
我正在尝试对位于 Hbase 中的两个表进行映射连接。我的目的是在hashmap中保留小表的记录并与大表进行比较,一旦匹配,再次将记录写入hbase中的表中。我使用 Mapper 和 Reducer
我正在尝试编写一个程序来连接到 HBase。但是当我执行以下命令时HBaseConfiguration.create();我收到以下错误:. "hbase-default.xml 文件似乎是旧版本的
基于HBase documentation ,再次遵循 Google BigTable 论文的引用,据说这些行是按行键的字典顺序存储的。 很明显,当我们在 rowkey 中有一个字符串或者如果我们将一
我有一个 hbase 表,其中的行键如 row1、row2、row3 .... 和 rowN,我想要的是获取行键从 row100 到 row200 的行,如何编写查询子句或将 hbase 表设计为让查
我正在尝试创建命名空间,但出现类似下面给出的错误 hbase(main):031:0> create namespace 'Aniruddha'
我发现为以下要求建模 HBase 表有困难。 我有一个表“商店”,它存储了商店的详细信息(必胜客)。 我有一个表格“订单”,其中包含交易摘要(总交易金额等...)。 我有另一个表“Order_Item
谁能告诉我如果在不首先禁用表的情况下使用“alter”命令可能影响表结构的可能影响? 据我所知,禁用表意味着关闭与表的所有连接。如果我在不禁用表的情况下使用 alter,可能会发生什么异常情况? 我正
我无法将表从 HBase 导出到 HDFS。下面是错误跟踪。它是相当大的尺寸。还有其他方法可以导出吗? 我使用以下命令导出。我增加了 rpc 超时,但工作仍然失败。 sudo -u hdfs hbas
我是一名优秀的程序员,十分优秀!