gpt4 book ai didi

scala - 如何通过使用Spark结构化流连续监视目录

转载 作者:行者123 更新时间:2023-12-04 10:47:16 28 4
gpt4 key购买 nike

我希望spark只要文件出现在该目录中,就可以持续监视目录并使用spark.readStream读取CSV文件。

请不要包括Spark Streaming的解决方案。我正在寻找一种通过使用Spark结构化流媒体来实现此目标的方法。

最佳答案

这是此用例的完整解决方案:

如果以独立模式运行。您可以通过以下方式增加驱动程序内存:

bin/spark-shell --driver-memory 4G

无需设置执行程序内存,因为在独立模式下,执行程序在驱动程序中运行。

作为完成@ T.Gaweda的解决方案,请找到以下解决方案:
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")

csvDf.writeStream.format("console").option("truncate","false").start()

现在,spark将连续监视指定的目录,并且一旦您在目录中添加任何csv文件,您的DataFrame操作“csvDF”就会在该文件上执行。

注意:如果要触发 Spark ,则必须首先设置以下配置:
spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")

Spark 是您的 Spark session 。

关于scala - 如何通过使用Spark结构化流连续监视目录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46198243/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com