gpt4 book ai didi

scala - 如何在 Spark 中执行初始化?

转载 作者:行者123 更新时间:2023-12-04 14:27:16 38 4
gpt4 key购买 nike

我想在 spark 中对我的数据执行 geoip 查找。为此,我使用 MaxMind 的 geoIP 数据库。

我想要做的是在每个分区上初始化一个 geoip 数据库对象,然后用它来查找与 IP 地址相关的城市。

spark 是否对每个节点都有初始化阶段,还是应该检查实例变量是否未定义,如果是,则在继续之前对其进行初始化?例如。类似于(这是python,但我想要一个scala解决方案):

class IPLookup(object):
database = None

def getCity(self, ip):
if not database:
self.database = self.initialise(geoipPath)
...

当然,这样做需要 spark 将整个对象序列化,这是文档警告的。

最佳答案

在 Spark 中,每个分区操作可以使用:

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)

这个映射器将执行函数 f每个分区对元素迭代器进行一次。这个想法是设置资源(如数据库连接)的成本将被迭代器中的许多元素使用这些资源所抵消。

例子:
val logsRDD = ???
logsRDD.mapPartitions{iter =>
val geoIp = new GeoIPLookupDB(...)
// this is local map over the iterator - do not confuse with rdd.map
iter.map(elem => (geoIp.resolve(elem.ip),elem))
}

关于scala - 如何在 Spark 中执行初始化?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27066790/

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com