gpt4 book ai didi

scala - Spark Structured Streaming writeStream 输出一个全局csv

转载 作者:行者123 更新时间:2023-12-03 23:54:03 24 4
gpt4 key购买 nike

我目前正在使用 Spark Structured Streaming 制作原始日志数据聚合器。

Inputstream 由一个包含文本文件的目录组成:

// == Input == //

val logsDF = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("input/*")

然后解析日志...
// == Parsing == //

val logsDF2 = ...

...和汇总
// == Aggregation == //

val windowedCounts = logsDF2
.withWatermark("window_start", "15 minutes")
.groupBy(
col("window"),
col("node")
).count()

当我使用“控制台”接收器时,一切正常:结果在控制台中按批次更新:
// == Output == //

val query = windowedCounts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()

现在我想将结果保存在一个唯一的文件中(json、parquet、csv ..)
// == Output == //

val query = windowedCounts.writeStream
.format("csv")
.option("checkpointLocation", "checkpoint/")
.start("output/")
.awaitTermination()

但它给我输出 400 个空的 csv ......我怎样才能像在控制台中那样得到我的结果?

非常感谢 !

最佳答案

很久以前,但我自己遇到了这个问题,并认为我会解决它。确实,我认为您的代码很好,直到您尝试将数据放入 csv 文件中。尝试将 writeStream csv 代码更改为:

// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.trigger(processingTime="10 seconds")
.option("checkpointLocation", "checkpoint/")
.option("path", "output_path/")
.outputMode("append")
.start()
.awaitTermination()
线路:
.trigger(processingTime="10 seconds")
应该解决您的 400 个文件,因为它每 10 秒只写入一个新文件。这两行:
.option("path", "output_path/")
.outputMode("append")
当您附加最新值并将文件输出到特定输出目录时,应该可以解决空文件问题。

关于scala - Spark Structured Streaming writeStream 输出一个全局csv,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52277716/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com