gpt4 book ai didi

java - 如何从spark中的hbase表中获取所有数据

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:36:05 26 4
gpt4 key购买 nike

我在 hbase 中有一个名为 UserAction 的大表,它具有三个列族(歌曲、专辑、歌手)。我需要从“歌曲”列族中获取所有数据作为 JavaRDD 对象。我尝试了这段代码,但效率不高。有更好的解决方案吗?

    static SparkConf sparkConf = new SparkConf().setAppName("test").setMaster(
"local[4]");
static JavaSparkContext jsc = new JavaSparkContext(sparkConf);

static void getRatings() {

Configuration conf = HBaseConfiguration.create();
conf.set(TableInputFormat.INPUT_TABLE, "UserAction");
conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "song");

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc
.newAPIHadoopRDD(
conf,
TableInputFormat.class,
org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
org.apache.hadoop.hbase.client.Result.class);

JavaRDD<Rating> count = hBaseRDD
.map(new Function<Tuple2<ImmutableBytesWritable, Result>, JavaRDD<Rating>>() {

@Override
public JavaRDD<Rating> call(
Tuple2<ImmutableBytesWritable, Result> t)
throws Exception {
Result r = t._2;
int user = Integer.parseInt(Bytes.toString(r.getRow()));
ArrayList<Rating> ra = new ArrayList<>();

for (Cell c : r.rawCells()) {

int product = Integer.parseInt(Bytes
.toString(CellUtil.cloneQualifier(c)));
double rating = Double.parseDouble(Bytes
.toString(CellUtil.cloneValue(c)));

ra.add(new Rating(user, product, rating));
}

return jsc.parallelize(ra);
}
})
.reduce(new Function2<JavaRDD<Rating>, JavaRDD<Rating>, JavaRDD<Rating>>() {
@Override
public JavaRDD<Rating> call(JavaRDD<Rating> r1,
JavaRDD<Rating> r2) throws Exception {
return r1.union(r2);
}
});
jsc.stop();
}

歌曲栏目族方案设计为:

RowKey = userID, columnQualifier = songID and value = rating.

最佳答案

更新:好的,我现在明白你的问题了,出于某种疯狂的原因,你将数组转换为 RDDs return jsc.parallelize(ra);。你为什么要那样做??为什么要创建 RDD 的 RDD?为什么不将它们保留为数组?当您执行 reduce 时,您可以连接数组。 RDD 是一种抗性分布式数据集——拥有 of 个分布式数据集的分布式数据集在逻辑上没有意义。我很惊讶你的工作甚至可以运行并且不会崩溃!无论如何,这就是你的工作如此缓慢的原因。

无论如何,在 Scala 中,在您的 map 之后,您只需执行一个 flatMap(identity) 即可将所有列表连接在一起。

我真的不明白你为什么需要减少,也许这就是你效率低下的地方。这是我读取 HBase 表的代码(它是通用的——即适用于任何方案)。需要确定的一件事是确保在读取 HBase 表时确保分区数量合适(通常您需要很多)。

type HBaseRow = java.util.NavigableMap[Array[Byte],
java.util.NavigableMap[Array[Byte], java.util.NavigableMap[java.lang.Long, Array[Byte]]]]
// Map(CF -> Map(column qualifier -> Map(timestamp -> value)))
type CFTimeseriesRow = Map[Array[Byte], Map[Array[Byte], Map[Long, Array[Byte]]]]

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
navMap.asScala.toMap.map(cf =>
(cf._1, cf._2.asScala.toMap.map(col =>
(col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def readTableAll(table: String): RDD[(Array[Byte], CFTimeseriesRow)] = {
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, table)
sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
}

如您所见,我的代码中不需要 reduce。这些方法很容易解释。我可以深入研究您的代码,但我没有耐心阅读 Java,因为它非常冗长。

我有一些专门用于从行中获取最新元素(而不是整个历史记录)的代码。如果您想查看,请告诉我。

最后,建议您考虑使用 Cassandra 而不是 HBase,因为 datastax 正在与 databricks 合作。

关于java - 如何从spark中的hbase表中获取所有数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24534410/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com