gpt4 book ai didi

scala - 使用 Spark Streaming 读取 fileStream

转载 作者:可可西里 更新时间:2023-11-01 15:11:25 29 4
gpt4 key购买 nike

我在 HDFS 上有一个目录,其中每 10 分钟 复制一个文件(现有文件被覆盖)。我想使用 Spark 流 (1.6.0) 读取文件的内容,并将其用作引用数据以将其加入其他流。

我将“记住窗口spark.streaming.fileStream.minRememberDuration设置为“600s”并设置newFilesOnlyfalse,因为当我启动应用程序时,我不想从已经存在的 HDFS 中获取初始数据。

val ssc = new StreamingContext(sparkConf, Seconds(2))
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
val lines: DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](loc, defaultFilter(_), false).map(_._2.toString)
lines.foreachRDD { x => x.foreach(println) }

我的想法是把这个DStream的内容持久化到内存中,委托(delegate)维护的任务这个“批量查找缓存”到 Spark。我希望在每次更改 HDFS 目录后自动获得新数据,我可以将其加入另一个流。

我不明白的地方:

  • 当我启动应用程序时,数据已加载,但如果我在本地触摸文件并覆盖我看不到的 HDFS 上的文件它的内容打印出来了
  • 如何缓存和重新加载这些数据?
  • 当我缓存它时,它将在工作节点上可用,或者这(连同连接)会在驱动程序中发生吗?

我是否还应该将 StreamingContext 时间间隔设置为 10 分钟,因为我只会每 10 分钟更改一次?

最佳答案

只是一些原始的想法。

when I start the application the data is loaded but then if I touch the file locally and overwrite the one on HDFS I won't see its content printed out anymore

要让 Spark Streaming 处理数据,必须以原子方式创建文件,例如通过将文件移动到 Spark 正在监视的目录中。文件重命名操作通常是原子的。您能否对此进行测试以验证它是否正常工作?

how to cache and reload this data? When I cache it will this be available on the worker nodes or this (along with the join) will happen in the driver?

直接的解决方案可能是在 foreachRDD() 方法中注册临时表。当新数据在流式传输期间出现时,可以重新创建适当的表。请记住,foreachRDD() 方法内部的逻辑应该是幂等的。

知道表名后,您可以轻松地创建一个单独的查询管道,该管道将连接来自这个预缓存临时表的数据。只需确保将 StreamingContext 设置为记住足够数量的流数据,以便查询可以运行。

Should I also set the StreamingContext time interval to 10 minutes as I will only have changes every 10 minutes?

在理想情况下,节奏应该匹配。为了安全起见,您也可以在 foreachRDD() 方法中接收到新数据时检查时间戳。

关于scala - 使用 Spark Streaming 读取 fileStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37492535/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com