gpt4 book ai didi

scala - 通过Spark写入HBase : Task not serializable

转载 作者:行者123 更新时间:2023-12-02 07:37:58 27 4
gpt4 key购买 nike

我正在尝试使用 Spark 1.0 在 HBase (0.96.0-hadoop2) 中写入一些简单的数据,但我不断遇到序列化问题。相关代码如下:

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext
import java.util.Properties
import java.io.FileInputStream
import org.apache.hadoop.hbase.client.Put

object PutRawDataIntoHbase{
def main(args: Array[String]): Unit = {
var propFileName = "hbaseConfig.properties"
if(args.size > 0){
propFileName = args(0)
}

/** Load properties here **/
val theData = sc.textFile(prop.getProperty("hbase.input.filename"))
.map(l => l.split("\t"))
.map(a => Array("%010d".format(a(9).toInt)+ "-" + a(0) , a(1)))

val tableName = prop.getProperty("hbase.table.name")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.rootdir", prop.getProperty("hbase.rootdir"))
hbaseConf.addResource(prop.getProperty("hbase.site.xml"))
val myTable = new HTable(hbaseConf, tableName)
theData.foreach(a=>{
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
})
}
}

运行代码结果:

Failed to run foreach at putDataIntoHBase.scala:79
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable

用map替换foreach不会崩溃,但我也不写。任何帮助将不胜感激。

最佳答案

HBaseConfiguration 类表示与 HBase 服务器的连接池。显然,它无法被序列化并发送到工作节点。由于 HTable 使用此池与 HBase 服务器通信,因此它也无法序列化。

基本上,可以通过三种方法来处理这个问题:

在每个工作节点上打开连接。

注意foreachPartition方法的使用:

val tableName = prop.getProperty("hbase.table.name")
<......>
theData.foreachPartition { iter =>
val hbaseConf = HBaseConfiguration.create()
<... configure HBase ...>
val myTable = new HTable(hbaseConf, tableName)
iter.foreach { a =>
var p = new Put(Bytes.toBytes(a(0)))
p.add(Bytes.toBytes(hbaseColFamily), Bytes.toBytes("col"), Bytes.toBytes(a(1)))
myTable.put(p)
}
}

请注意,每个工作节点都必须有权访问 HBase 服务器,并且必须预先安装或通过 ADD_JARS 提供所需的 jar。

另请注意,由于如果为每个分区打开连接池,因此最好将分区数量大致减少到工作节点的数量(使用合并功能)。也可以在每个工作节点上共享一个 HTable 实例,但这并不是那么简单。

将所有数据序列化到单个盒子并写入HBase

可以用一台计算机写入 RDD 中的所有数据,即使数据不适合内存。详细信息在此答案中进行了解释:Spark: Best practice for retrieving big data from RDD to local machine

当然,它会比分布式写入慢,但它很简单,不会带来痛苦的序列化问题,如果数据大小合理,可能是最好的方法。

使用 HadoopOutputFormat

可以为 HBase 创建自定义 HadoopOutputFormat 或使用现有格式。我不确定是否有适合您需求的东西,但 Google 应该可以提供帮助。

P.S. 顺便说一下,map 调用不会崩溃,因为它不会被求值:RDD 不会被求值,直到你用 side- 调用一个函数。影响。例如,如果您调用 theData.map(....).persist,它也会崩溃。

关于scala - 通过Spark写入HBase : Task not serializable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25250774/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com