gpt4 book ai didi

hadoop - Spark Streaming:将Dstream批处理加入单个输出文件夹

转载 作者:行者123 更新时间:2023-12-02 21:38:51 24 4
gpt4 key购买 nike

我正在使用Spark Streaming通过将StreamingContext创建为val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))从Twitter获取推文。

并创建Twitter流为:val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
然后将其另存为文本文件tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
问题是,这些推文根据批处理时间保存为文件夹,但是我需要将每个批处理的所有数据保存在同一文件夹中。

有什么解决方法吗?

谢谢

最佳答案

我们可以使用Spark SQL的新DataFrame保存API来做到这一点,该API可以追加到现有输出中。默认情况下,saveAsTextFile将无法保存到包含现有数据的目录中(请参阅https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)。 https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations涵盖如何设置Spark SQL上下文以与Spark Streaming一起使用。

假设您使用SQLContextSingleton从指南中复制了该部分,则结果代码如下所示:

data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}

(请注意,上面的示例使用JSON保存结果,但是您也可以使用其他输出格式)。

关于hadoop - Spark Streaming:将Dstream批处理加入单个输出文件夹,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30237877/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com