gpt4 book ai didi

scala - 在我停止作业之前,Spark Structured Streaming writestream 不会写入文件

转载 作者:行者123 更新时间:2023-12-04 15:48:08 25 4
gpt4 key购买 nike

我在一个经典用例中使用 Spark Structured Streaming:我想从 kafka 主题中读取数据并将流以 parquet 格式写入 HDFS。

这是我的代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{ArrayType, DataTypes, StructType}

object TestKafkaReader extends App{
val spark = SparkSession
.builder
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._

val kafkaDf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","KAFKA_BROKER_IP:PORT")
//.option("subscribe", "test")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()

val moviesJsonDf = kafkaDf.selectExpr("CAST(value AS STRING)")

// movie struct
val struct = new StructType()
.add("title", DataTypes.StringType)
.add("year", DataTypes.IntegerType)
.add("cast", ArrayType(DataTypes.StringType))
.add("genres", ArrayType(DataTypes.StringType))

val moviesNestedDf = moviesJsonDf.select(from_json($"value", struct).as("movie"))
// json flatten
val movieFlattenedDf = moviesNestedDf.selectExpr("movie.title", "movie.year", "movie.cast","movie.genres")


// convert to parquet and save to hdfs
val query = movieFlattenedDf
.writeStream
.outputMode("append")
.format("parquet")
.queryName("movies")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.start("src/main/resources/output")
.awaitTermination()
}

上下文:

  • 我直接从 intellij 运行它(使用本地 spark安装)
  • 我设法毫无问题地从 kafka 读取并写入控制台(使用控制台模式)
  • 目前我想写文件在本地机器上(但我确实在 HDFS 集群上尝试过,问题是相同)

我的问题:

在工作期间,它没有在文件夹中写入任何内容,我必须手动停止工作才能最终看到文件。

我认为可能与 .awaitTermination() 有关有关信息,我尝试删除此选项,但没有删除此选项时出现错误,作业根本无法运行。

也许我没有设置正确的选项,但在多次阅读文档并在 Google 上搜索后,我没有找到任何东西。

你能帮我解决这个问题吗?

谢谢

编辑:

  • 我正在使用 spark 2.4.0
  • 我尝试了 64/128mb 格式 => 在我停止工作之前没有任何文件改变

最佳答案

是的问题解决

我的问题是,我的数据太少,而 spark 正在等待更多数据来写入 parquet 文件。

为了完成这项工作,我使用了@AlexandrosBiratsis 的评论(更改 block 大小)

再次归功于@AlexandrosBiratsis非常感谢

关于scala - 在我停止作业之前,Spark Structured Streaming writestream 不会写入文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54921528/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com