gpt4 book ai didi

scala - 丰富 SparkContext 而不会引发序列化问题

转载 作者:行者123 更新时间:2023-12-03 05:46:26 28 4
gpt4 key购买 nike

我正在尝试使用 Spark 处理来自 HBase 表的数据。 This blog post给出了如何使用 NewHadoopAPI 从任何 Hadoop InputFormat 读取数据的示例。

我做了什么

由于我需要多次执行此操作,因此我尝试使用隐式来丰富 SparkContext,以便我可以从 HBase 中给定的一组列中获取 RDD。我编写了以下助手:

trait HBaseReadSupport {
implicit def toHBaseSC(sc: SparkContext) = new HBaseSC(sc)

implicit def bytes2string(bytes: Array[Byte]) = new String(bytes)
}


final class HBaseSC(sc: SparkContext) extends Serializable {
def extract[A](data: Map[String, List[String]], result: Result, interpret: Array[Byte] => A) =
data map { case (cf, columns) =>
val content = columns map { column =>
val cell = result.getColumnLatestCell(cf.getBytes, column.getBytes)

column -> interpret(CellUtil.cloneValue(cell))
} toMap

cf -> content
}

def makeConf(table: String) = {
val conf = HBaseConfiguration.create()

conf.setBoolean("hbase.cluster.distributed", true)
conf.setInt("hbase.client.scanner.caching", 10000)
conf.set(TableInputFormat.INPUT_TABLE, table)

conf
}

def hbase[A](table: String, data: Map[String, List[String]])
(interpret: Array[Byte] => A) =

sc.newAPIHadoopRDD(makeConf(table), classOf[TableInputFormat],
classOf[ImmutableBytesWritable], classOf[Result]) map { case (key, row) =>
Bytes.toString(key.get) -> extract(data, row, interpret)
}

}

它可以像这样使用

val rdd = sc.hbase[String](table, Map(
"cf" -> List("col1", "col2")
))

在这种情况下,我们得到一个 RDD (String, Map[String, Map[String, String]]),其中第一个组件是 rowkey,第二个组件是一个映射,其键为列族和值是映射,其键是列,其内容是单元格值。

失败的地方

不幸的是,我的工作似乎引用了 sc,它本身在设计上是不可序列化的。当我运行该作业时我得到的是

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.apache.spark.SparkContext
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)

我可以删除辅助类并在工作中使用相同的内联逻辑,一切都运行良好。但我想要得到一些可以重复使用的东西,而不是一遍又一遍地编写相同的样板。

顺便说一句,这个问题并不特定于隐式,即使使用 sc 函数也会出现同样的问题。

作为比较,以下读取 TSV 文件的帮助程序(我知道它已损坏,因为它不支持引用等,没关系)似乎工作正常:

trait TsvReadSupport {
implicit def toTsvRDD(sc: SparkContext) = new TsvRDD(sc)
}

final class TsvRDD(val sc: SparkContext) extends Serializable {
def tsv(path: String, fields: Seq[String], separator: Char = '\t') = sc.textFile(path) map { line =>
val contents = line.split(separator).toList

(fields, contents).zipped.toMap
}
}

How can I encapsulate the logic to read rows from HBase without unintentionally capturing the SparkContext?

最佳答案

只需将@transient注释添加到sc变量:

final class HBaseSC(@transient val sc: SparkContext) extends Serializable {
...
}

并确保 sc 不在 extract 函数中使用,因为它对工作人员不可用。

如果需要从分布式计算中访问 Spark 上下文,可以使用 rdd.context 函数:

val rdd = sc.newAPIHadoopRDD(...)
rdd map {
case (k, v) =>
val ctx = rdd.context
....
}

关于scala - 丰富 SparkContext 而不会引发序列化问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23737804/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com