gpt4 book ai didi

hadoop - Spark 流 : HDFS

转载 作者:可可西里 更新时间:2023-11-01 14:24:51 25 4
gpt4 key购买 nike

  1. 我无法让我的 Spark 作业从 HDFS 流式传输“旧”文件。

如果我的 Spark 作业由于某种原因(例如演示、部署)而关闭,但写入/移动到 HDFS 目录是连续的,我可能会在启动 Spark Streaming 作业后跳过这些文件。

    val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

hdfsDStream.foreachRDD(
rdd => logInfo("Number of records in this batch: " + rdd.count())
)

输出-->这批记录数:0

  1. 有没有办法让 Spark Streaming 将“读取”的文件移动到不同的文件夹?或者我们必须手动编程?因此它将避免读取已经“读取”的文件。

  2. Spark Streaming 是否与在 CRON 中运行 spark 作业 (sc.textFile) 相同?

最佳答案

正如 Dean 提到的,textFileStream 使用仅使用新文件的默认设置。

  def textFileStream(directory: String): DStream[String] = {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}

所以,它所做的就是调用 fileStream 的这个变体

def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}

并且,查看 FileInputDStream 类,我们将看到它确实可以查找现有文件,但默认为仅新文件:

newFilesOnly: Boolean = true,

因此,回到 StreamingContext 代码,我们可以看到可以通过直接调用 fileStream 方法来使用重载:

def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag]
(directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
}

所以,TL;DR;是

ssc.fileStream[LongWritable, Text, TextInputFormat]
(directory, FileInputDStream.defaultFilter, false).map(_._2.toString)

关于hadoop - Spark 流 : HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29022379/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com