gpt4 book ai didi

hbase - Spark序列化错误

转载 作者:行者123 更新时间:2023-12-01 17:32:41 24 4
gpt4 key购买 nike

我正在尝试学习spark + scala。我想从 HBase 读取数据,但没有 MapReduce。我创建了一个简单的 HBase 表 - “test”,并在其中执行了 3 次操作。我想通过spark读取它(没有使用mapreduce的HBaseTest)。我尝试在 shell 上运行以下命令

val numbers = Array(
new Get(Bytes.toBytes("row1")),
new Get(Bytes.toBytes("row2")),
new Get(Bytes.toBytes("row3")))
val conf = new HBaseConfiguration()
val table = new HTable(conf, "test")
sc.parallelize(numbers, numbers.length).map(table.get).count()

我不断收到错误 -org.apache.spark.SparkException:作业中止:任务不可序列化:java.io.NotSerializedException:org.apache.hadoop.hbase.HBaseConfiguration

有人可以帮我吗,我如何创建一个使用可序列化配置的 Htable

谢谢

最佳答案

您的问题是 table 不可序列化(而是它的成员 conf),并且您尝试通过在 map 中使用它来序列化它。他们尝试读取 HBase 的方式不太正确,看起来你尝试了一些特定的 Get,然后尝试并行执行它们。即使您确实做到了这一点,这实际上也不会随着您执行随机读取而扩展。您想要做的是使用 Spark 执行表扫描,下面是一个可以帮助您完成此操作的代码片段:

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)

sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])

这将为您提供一个 RDD,其中包含构成行的 NaviagableMap。下面是如何将 NaviagbleMap 更改为普通的 Scala 字符串映射:

...
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
.map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
navMap.asScala.toMap.map(cf =>
(cf._1, cf._2.asScala.toMap.map(col =>
(col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
navMap.map(cf =>
(Bytes.toString(cf._1), cf._2.map(col =>
(Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))

最后一点,如果您确实想尝试并行执行随机读取,我相信您可以将 HBase 表初始化放在 map 中。

关于hbase - Spark序列化错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23619775/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com