作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想在 spark 中对我的数据执行 geoip 查找。为此,我使用 MaxMind 的 geoIP 数据库。
我想要做的是在每个分区上初始化一个 geoip 数据库对象,然后用它来查找与 IP 地址相关的城市。
spark 是否对每个节点都有初始化阶段,还是应该检查实例变量是否未定义,如果是,则在继续之前对其进行初始化?例如。类似于(这是python,但我想要一个scala解决方案):
class IPLookup(object):
database = None
def getCity(self, ip):
if not database:
self.database = self.initialise(geoipPath)
...
最佳答案
在 Spark 中,每个分区操作可以使用:
def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)
f
每个分区对元素迭代器进行一次。这个想法是设置资源(如数据库连接)的成本将被迭代器中的许多元素使用这些资源所抵消。
val logsRDD = ???
logsRDD.mapPartitions{iter =>
val geoIp = new GeoIPLookupDB(...)
// this is local map over the iterator - do not confuse with rdd.map
iter.map(elem => (geoIp.resolve(elem.ip),elem))
}
关于scala - 如何在 Spark 中执行初始化?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27066790/
我是一名优秀的程序员,十分优秀!