gpt4 book ai didi

scala - Spark : broadcasting jackson ObjectMapper

转载 作者:行者123 更新时间:2023-12-04 05:51:46 26 4
gpt4 key购买 nike

我有一个 spark 应用程序,它从文件中读取行并尝试使用 jackson 对它们进行反序列化。为了让这段代码正常工作,我需要在 Map 操作中定义 ObjectMapper(否则我会得到 NullPointerException)。

我有以下正在运行的代码:

val alertsData = sc.textFile(rawlines).map(alertStr => {
val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})

但是,如果我在 map 之外定义映射器并广播它,它会失败并出现 NullPointerException。

此代码失败:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.readValue(alertStr, classOf[Alert])
})

我在这里错过了什么?

谢谢,艾丽莎

最佳答案

事实证明你可以广播映射器。有问题的部分是 mapper.registerModule(DefaultScalaModule) 需要在每台从属(执行器)机器上执行,而不仅仅是在驱动程序上。

所以这段代码可以工作:

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
val broadcastVar = sc.broadcast(mapper)

val alertsData = sc.textFile(rawlines).map(alertStr => {
broadcastVar.value.registerModule(DefaultScalaModule)
broadcastVar.value.readValue(alertStr, classOf[Alert])
})

我进一步优化了代码,每个分区只运行一次 registerModule(而不是 RDD 中的每个元素)。

val mapper = new ObjectMapper()
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

val broadcastVar = sc.broadcast(mapper)
val alertsRawData = sc.textFile(rawlines)

val alertsData = alertsRawData.mapPartitions({ iter: Iterator[String] => broadcastVar.value.registerModule(DefaultScalaModule)
for (i <- iter) yield broadcastVar.value.readValue(i, classOf[Alert]) })

艾丽莎

关于scala - Spark : broadcasting jackson ObjectMapper,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32495891/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com