gpt4 book ai didi

java - 在 Spark Java 中定义要广播的对象的位置

转载 作者:行者123 更新时间:2023-12-02 02:48:32 24 4
gpt4 key购买 nike

我有一个数据库对象,用于从所有 Spark 执行器插入数据。当我将此对象定义为static时,它在这些执行器中具有null值。所以我在驱动程序中声明它,广播它,然后在每个执行程序中获取它的值。当我运行该应用程序时,抛出以下异常:

Exception in thread "main" java.io.NotSerializableException: database.Database

注释:

  • 执行器类是可序列化的
  • 广播对象在该类中被定义为 transient
  • 我删除了 transient ,但它不起作用

最佳答案

我这样解释你的问题:

I want to insert data from my RDD from all Spark executors. I tried to create one DB connection on the Driver and pass it somehow as a Broadcast to the executors, but Spark keeps throwing NotSerializableException. How can I achieve my goal?

简短的回答是:

You should create a new connection on every executor node separately.
You should not pass database connection handlers, file handlers and the likes to other processes and especially remote machines.

这里的问题是到底在哪里创建数据库连接,因为执行器数量较多,很容易超出数据库的连接池大小。

您实际上可以做的是使用 foreachPartition ,就像这里:

  // numPartitions == number of simultaneous DB connections you can afford
yourRdd.repartition(numPartitions)
.foreachPartition {
iter =>
val connection = createConnection()
while (iter.hasNext) {
connection.execute("INSERT ...")
}
connection.commit()
}

这里 .foreachPartition 中的代码将在每个执行器机器上执行,并且连接对象不会通过网络发送,不会出现序列化异常并且数据将被插入。

this 的答案中也提到了使用 foreachPartition 的相同推理。问题。

关于java - 在 Spark Java 中定义要广播的对象的位置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44199403/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com