gpt4 book ai didi

java - 当 TextIO 完成写入文件时通知

转载 作者:行者123 更新时间:2023-11-30 07:53:16 26 4
gpt4 key购买 nike

我正在处理一个使用 Google Cloud 中的 DataFlow 的两个管道的场景:

管道 A 以流模式运行,基于每小时窗口和一些分片,在 Google 存储中不断创建文件,如下所示:

data.apply(TextIO.write().to(resource.getCurrentDirectory())
.withFilenamePolicy(new PerWindowFiles(prefix))
.withWindowedWrites()
.withNumShards(42));

管道 B 以批处理模式工作,定期加载这些文件以进行进一步处理,例如每小时一次。

问题来了:管道 B 可以从 GS 中安全地加载哪些文件?

  • 所有这些 -> 可能不是一个好主意,以防 A 没有写完其中一些,我们会得到损坏的文件。

  • 基于时间(比如只加载至少 2 小时前的文件)-> 如果 A 迟到也会导致问题

  • A 中创建“完成”标志的某种方式,它告诉 B 哪些文件已完成。

  • 以某种方式在窗口的最终 Pane 处理完成时收到通知 -> 尚未找到执行此操作的方法。

我想要第三种方法,但找不到一种方法来确定 TextIO 何时实际完成写入文件而不等待管道完成。

TextIO 的作者|不返回另一个 PCollection。一种方法是覆盖 FileBasedSink.WriteOperationfinalize 方法。它是在 TextIO 内部某处创建的,需要复制整个类并最终构建一个自定义 Sink。在我看来,这有点矫枉过正。

任何人都有更简单的解决方案的想法或体验过如何实现这一点?

最佳答案

TextIO.write() 会将数据写入临时文件,然后自动将每个成功写入的临时文件重命名为其最终位置。您可以安全地在管道 B 中使用与您的“前缀”匹配的文件,因为临时文件将以与前缀不匹配的方式命名(我们在决定如何命名临时文件时明确考虑了您的用例),因此所有文件管道 B 看到的将是完整的。

或者,我们是 about to add (link to pull request) TextIO.read() 的一个版本,它以流模式连续摄取新文件;准备就绪后,您可以在管道 B 中使用它。另请参阅 http://s.apache.org/textio-sdf以及链接的 JIRA。

关于java - 当 TextIO 完成写入文件时通知,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44967898/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com