gpt4 book ai didi

apache-spark - 如何在 foreachBatch 中使用临时表?

转载 作者:行者123 更新时间:2023-12-02 19:47:58 28 4
gpt4 key购买 nike

我们正在构建一个流媒体平台,在该平台上,批处理 SQL 是必不可少的。

val query = streamingDataSet.writeStream.option("checkpointLocation", checkPointLocation).foreachBatch { (df, batchId) => {

df.createOrReplaceTempView("events")

val df1 = ExecutionContext.getSparkSession.sql("select * from events")

df1.limit(5).show()
// More complex processing on dataframes

}}.trigger(trigger).outputMode(outputMode).start()

query.awaitTermination()

抛出的错误是:

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: events
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'events' not found in database 'default';

流媒体源是带有水印的 Kafka,无需使用 Spark-SQL,我们就能够执行数据帧转换。 Spark 版本是 2.4.0,Scala 是 2.11.7。 Trigger 是 ProcessingTime 每 1 分钟一次,OutputMode 是 Append。

是否有任何其他方法可以促进在 foreachBatch 中使用 spark-sql?它会与 Spark 的升级版本一起使用吗 - 在这种情况下我们升级到版本吗?请帮忙。谢谢。

最佳答案

tl;drExecutionContext.getSparkSession 替换为 df.sparkSession


StreamingQueryException 的原因是流式查询试图访问 SparkSession 中的 events 临时表,对此一无所知,即 ExecutionContext.getSparkSession

唯一注册了此events 临时表的SparkSession 正是SparkSession df 数据帧被创建内,即 df.sparkSession

关于apache-spark - 如何在 foreachBatch 中使用临时表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58914951/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com