gpt4 book ai didi

scala - Spark : DB connection per Spark RDD partition and do mapPartition

转载 作者:行者123 更新时间:2023-12-03 03:33:20 25 4
gpt4 key购买 nike

我想在我的 Spark rdd 上做一个mapPartitions,

    val newRd = myRdd.mapPartitions(
partition => {

val connection = new DbConnection /*creates a db connection per partition*/

val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})

但是,这给了我一个连接已关闭的异常,正如预期的那样,因为在控件到达 .map() 之前,我的 connection 已关闭。我想为每个 RDD 分区创建一个连接,并正确关闭它。我怎样才能实现这个目标?

谢谢!

最佳答案

正如讨论中提到的 here - 该问题源于迭代器分区上的映射操作的惰性。这种惰性意味着对于每个分区,都会创建并关闭一个连接,并且只有稍后(当对 RDD 进行操作时)才会调用 readMatchingFromDB。

要解决此问题,您应该在关闭连接之前强制对迭代器进行急切遍历,例如通过将其转换为列表(然后再转换回来):

val newRd = myRdd.mapPartitions(partition => {
val connection = new DbConnection /*creates a db connection per partition*/

val newPartition = partition.map(record => {
readMatchingFromDB(record, connection)
}).toList // consumes the iterator, thus calls readMatchingFromDB

connection.close()
newPartition.iterator // create a new iterator
})

关于scala - Spark : DB connection per Spark RDD partition and do mapPartition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37881042/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com