gpt4 book ai didi

scala - Spark 流 : NullPointerException inside foreachPartition

转载 作者:行者123 更新时间:2023-12-04 12:51:43 24 4
gpt4 key购买 nike

我有一个 Spark Streaming 作业,它从 Kafka 读取数据,并在再次写入 Postrges 之前与 Postgres 中的现有表进行一些比较。这是它的样子:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
println("First")

kafkaDF.foreachPartition(
i =>{
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql://...",
"dbtable" -> "table", "user" -> "user", "password" -> "pwd" )).load()

createConnection()
i.foreach(
row =>{
println("Second")
connection.sendToTable()
}
)
closeConnection()
}
)

此代码在行 val jbdcDF = ...

处给我 NullPointerException

我做错了什么?此外,我的日志 "First" 有效,但 "Second" 未显示在日志中的任何位置。我用 kafkaDF.collect().foreach(...) 尝试了整个代码,它运行良好,但性能很差。我希望将其替换为 foreachPartition

谢谢

最佳答案

尚不清楚 createConnectioncloseConnectionconnection.sendToTable 中是否存在任何问题,但根本问题是试图嵌套操作/转变。 Spark 不支持它,Spark Streaming 也不异常(exception)。

这意味着嵌套的 DataFrame 初始化 (val jdbcDF = sqlContext.read.format ...) 根本无法工作,应该被删除。如果您将其用作引用,则应在与 kafkaDF 相同的级别创建它,并使用标准转换(unionAlljoin、.. .).

如果出于某种原因这不是一个可接受的解决方案,您可以在 forEachPartition 中创建纯 JDBC 连接并在 PostgreSQL 表上进行操作(我想这就是您在 sendToTable)。

关于scala - Spark 流 : NullPointerException inside foreachPartition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35149997/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com