gpt4 book ai didi

apache-spark - Spark Streaming textFileStream 不支持通配符

转载 作者:行者123 更新时间:2023-12-03 07:21:42 24 4
gpt4 key购买 nike

我设置了一个简单的测试来从 S3 流式传输文本文件,并在我尝试类似的操作时让它工作

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/")

在存储桶中,我会将日志文件放入其中,一切都会正常工作。

但是如果它们是子文件夹,它不会找到放入子文件夹的任何文件(是的,我知道 hdfs 实际上并不使用文件夹结构)

val input = ssc.textFileStream("s3n://mybucket/2015/04/")

因此,我尝试简单地使用通配符,就像我之前使用标准 Spark 应用程序所做的那样

val input = ssc.textFileStream("s3n://mybucket/2015/04/*")

但是当我尝试这个时,它会抛出一个错误

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist.
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523)
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176)
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
.....

我知道您可以在读取标准 Spark 应用程序的 fileInput 时使用通配符,但似乎在进行流输入时,它不会这样做,也不会自动处理子文件夹中的文件。我在这里缺少什么吗?

最终我需要的是一个 24/7 运行的流作业,该作业将监视按日期放置日志的 S3 存储桶

所以类似

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName>

有什么方法可以将其交给最顶层的文件夹,并自动读取任何文件夹中显示的文件(因为显然日期每天都会增加)?

编辑

因此,在深入研究 http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources 的文档后它指出不支持嵌套目录。

谁能解释一下为什么会出现这种情况?

此外,由于我的文件将根据其日期进行嵌套,那么在我的流应用程序中解决此问题的好方法是什么?这有点复杂,因为日志需要几分钟才能写入 S3,因此当天写入的最后一个文件可能会写入前一天的文件夹中,即使我们距离新的一天还有几分钟。

最佳答案

可以通过扩展 FileInputDStream 创建一些“丑陋但有效的解决方案”。编写 sc.textFileStream(d) 相当于

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)

您可以创建将扩展 FileInputDStream 的 CustomFileInputDStream。自定义类将从 FileInputDStream 类复制计算方法,并根据您的需要调整 findNewFiles 方法。

更改 findNewFiles 方法:

 private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = clock.getTimeMillis()

// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting
currentTime - durationToRemember.milliseconds // trailing end of the remember window
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val filter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
)
}
newFiles
} catch {
case e: Exception =>
logWarning("Error finding new files", e)
reset()
Array.empty
}

}

至:

  private def findNewFiles(currentTime: Long): Array[String] = {
try {
lastNewFileFindingTime = clock.getTimeMillis()

// Calculate ignore threshold
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting
currentTime - durationToRemember.milliseconds // trailing end of the remember window
)
logDebug(s"Getting new files for time $currentTime, " +
s"ignoring files older than $modTimeIgnoreThreshold")
val filter = new PathFilter {
def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold)
}
val directories = fs.listStatus(directoryPath).filter(_.isDirectory)
val newFiles = ArrayBuffer[FileStatus]()

directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*))

val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
logDebug("# cached file times = " + fileToModTime.size)
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reducing the number of " +
"files in the monitored directory."
)
}
newFiles.map(_.getPath.toString).toArray
} catch {
case e: Exception =>
logWarning("Error finding new files", e)
reset()
Array.empty
}
}

将检查所有一级子文件夹中的文件,您可以将其调整为使用批处理时间戳来访问相关的“子目录”。

我创建了我提到的 CustomFileInputDStream 并通过调用激活它:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString)

它似乎符合我们的预期。

当我编写这样的解决方案时,我必须添加一些考虑点:

  • 您正在破坏 Spark 封装并创建一个自定义类,随着时间的推移,您必须仅支持该类。

  • 我相信这样的解决方案是最后的手段。如果您的用例可以通过不同的方式实现,通常最好避免这样的解决方案。

  • 如果您在 S3 上有很多“子目录”并且要检查每个子目录,那么您会付出代价。

  • 了解 Databricks 是否仅仅因为可能的性能损失而不支持嵌套文件将会非常有趣,也许还有我没有考虑过的更深层次的原因。

关于apache-spark - Spark Streaming textFileStream 不支持通配符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29426246/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com