gpt4 book ai didi

google-cloud-dataflow - 流水线运行后移动文件

转载 作者:行者123 更新时间:2023-12-04 02:17:29 25 4
gpt4 key购买 nike

在数据流管道完成运行后,是否可以在 GCS 中移动文件?如果是这样,如何?应该是最后一个.apply吧?我无法想象会是这样。

这里的情况是我们从客户导入大量 .csv。我们需要无限期地保留这些 CSV,因此我们要么需要“将 CSV 标记为已处理”,要么将它们移出 TextIO 用于查找 csv 的初始文件夹。我目前唯一能想到的可能是在 BigQuery 中存储文件名(我不确定我是怎么得到这个的,我是一个 DF 新手),然后从执行中排除已经存储的文件管道不知何故?但必须有更好的方法。

这可能吗?我应该检查什么?

感谢您的帮助!

最佳答案

您可以尝试使用 BlockingDataflowPipelineRunner 并在 p.run() 之后在您的主程序中运行任意逻辑(它将等待管道完成)。

参见 Specifying Execution Parameters ,特别是“阻止执行”部分。

但是,总的来说,您似乎真的想要一个持续运行的管道来监视包含 CSV 文件的目录并在新文件出现时导入新文件,而不是将同一个文件导入两次。这对于流式管道来说是一个很好的案例:您可以编写一个自定义 UnboundedSource (另请参阅 Custom Sources and Sinks ),它会监视目录并返回其中的文件名(即 T 可能是 StringGcsPath):

p.apply(Read.from(new DirectoryWatcherSource(directory)))
.apply(ParDo.of(new ReadCSVFileByName()))
.apply(the rest of your pipeline)

DirectoryWatcherSource 是您的 UnboundedSourceReadCSVFileByName 也是您需要编写的转换,它获取文件路径并读取它作为 CSV 文件,返回其中的记录(不幸的是,现在您不能在管道中间使用 TextIO.Read 之类的转换,只能在开始时使用 - 我们正在努力解决这个问题)。

这可能有点棘手,正如我所说,我们正在开发一些功能以使其更简单,我们正在考虑创建这样的内置源,但目前可能仍然如此比“弹球工作”更容易。请尝试一下,如果有任何不清楚的地方,请通过 dataflow-feedback@google.com 告诉我们!

与此同时,您还可以在 Cloud Bigtable 中存储有关您已处理或未处理的文件的信息。 - 它比 BigQuery 更适合这种情况,因为它更适合随机写入和查找,而 BigQuery 更适合对整个数据集进行大批量写入和查询。

关于google-cloud-dataflow - 流水线运行后移动文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33054786/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com