gpt4 book ai didi

java - NFS(Netapp 服务器)-> Flink -> s3

转载 作者:行者123 更新时间:2023-12-02 09:32:33 24 4
gpt4 key购买 nike

我是 flink (java) 的新手,并尝试将作为文件路径安装的 netapp 文件服务器上的 xml 文件移动到安装了 flink 的服务器上。

如何实时进行批处理或流处理以获取到达文件夹的文件并使用 s3 接收它。

我在 flink-starter 中找不到任何从本地文件系统读取文件的示例,flink 至少是这个用例的正确选择吗?如果是这样,我在哪里可以找到资源来监听文件夹和管理检查点/保存点?

最佳答案

如果您的目标只是将文件复制到 s3,则有更简单且更合适的工具。也许sync适合。

假设使用 Flink 有意义(例如,因为您想要对数据执行一些有状态转换),则需要所有任务管理器(工作人员)都可以使用以下方式访问要处理的文件相同的 URI。为此,您可以使用 file://URI。

您可以执行类似的操作来监视目录并在新文件出现时摄取它们:

StreamExecutionEnvironment env =    
StreamExecutionEnvironment.getExecutionEnvironment();

// monitor directory, checking for new files
// every 100 milliseconds

TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));

DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());

请注意 documentation 中的此警告:

If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.

这意味着您应该自动将准备好提取的文件移动到正在监视的文件夹中。

您可以使用Streaming File Sink写入S3。 Flink 的写入操作(例如 writeUsingOutputFormat())不参与检查点,因此在这种情况下这不是一个好的选择。

关于java - NFS(Netapp 服务器)-> Flink -> s3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57833940/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com