gpt4 book ai didi

hadoop - 使用 S3AFileSystem 的 Flink 不会从 S3 读取子文件夹

转载 作者:可可西里 更新时间:2023-11-01 14:43:37 25 4
gpt4 key购买 nike

我们正在使用具有建议的 S3AFileSystem 配置的 Flink 1.2.0。当源是 S3 存储桶中的单个文件夹时,简单的流式处理作业会按预期工作。

作业运行时没有错误——但产生输出——当它的源是一个本身包含子文件夹的文件夹时。

为清楚起见,下面是 S3 存储桶的模型。运行指向 s3a://bucket/folder/2017/04/25/01/ 的作业会正确读取所有三个对象以及存储桶中出现的任何后续对象。将作业指向 s3a://bucket/folder/2017/(或任何其他中间文件夹)会导致作业在不产生任何内容的情况下运行。

在绝望中,我们尝试了 [in|ex] 包含尾随 / 的排列。

.
`-- folder
`-- 2017
`-- 04
|-- 25
| |-- 00
| | |-- a.txt
| | `-- b.txt
| `-- 01
| |-- c.txt
| |-- d.txt
| `-- e.txt
`-- 26

工作代码:

def main(args: Array[String]) {

val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")

val path = s"s3a://$bucket/$folder"

val env = StreamExecutionEnvironment.getExecutionEnvironment

val lines: DataStream[String] = env.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = Time.seconds(10).toMilliseconds)

lines.print()
env.execute("Flink Streaming Scala API Skeleton")
}

core-site.xml 根据文档配置:

<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>

我们已经包含了此处列出的 S3AFileSystem 的所有 jar:https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27

我们被难住了。这似乎应该有效; Internet 上有大量面包屑导航表明这确实 有效。 [例如 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]

帮帮我,松鼠们……你们是我唯一的希望!

最佳答案

Steve Loughran 的帮助下回答我自己的问题...以上。

在 Flink 中,当处理 file-based data source to process continuously 时, FileInputFormat enumerate nested files默认情况下。

无论源是 S3 还是其他任何东西都是如此。

你必须这样设置:

def main(args: Array[String]) {

val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")

val path = s"s3a://$bucket/$folder"

val env = StreamExecutionEnvironment.getExecutionEnvironment

val textInputFormat = new TextInputFormat(new Path(path))

//this is important!
textInputFormat.setNestedFileEnumeration(true)

val lines: DataStream[String] = env.readFile(
inputFormat = textInputFormat,
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = Time.seconds(10).toMilliseconds)

lines.print()
env.execute("Flink Streaming Scala API Skeleton")

关于hadoop - 使用 S3AFileSystem 的 Flink 不会从 S3 读取子文件夹,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43648866/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com