gpt4 book ai didi

google-cloud-storage - 在 Apache Beam/Dataflow Python 流中写入文本文件

转载 作者:行者123 更新时间:2023-12-03 14:02:01 31 4
gpt4 key购买 nike

我有一个非常基本的 Python Dataflow 作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google Cloud Storage。

transformed = ...
transformed | beam.io.WriteToText(known_args.output)

输出写入--output 中特定的位置,但仅写入临时阶段,即
gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...

该文件永远不会使用分片模板放入正确命名的位置。

在本地和 DataFlow 运行器上测试。

在进一步测试时,我注意到 streaming_wordcount 示例有相同的问题,但是标准 wordcount 示例写得很好。也许问题在于窗口,或从 pubsub 阅读?

WriteToText 似乎与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。

最佳答案

WriteToText Python SDK 中的转换不支持流式传输。

相反,您可以考虑 apache_beam.io.fileio 中的变换。 .在这种情况下,您可以编写如下内容(假设为 10 分钟的窗口):

my_pcollection = (p | ReadFromPubSub(....)
| WindowInto(FixedWindows(10*60))
| fileio.WriteToFiles(path=known_args.output))

这足以为每个窗口写出单独的文件,并随着流的推进继续这样做。

你会看到这样的文件(假设输出是 gs://mybucket/ )。当窗口被触发时,文件将被打印:
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...

默认情况下,这些文件有 $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix名称 - 其中前缀是 output默认情况下,但您可以传递更复杂的文件命名函数。

如果你想自定义文件的写入方式(例如文件的命名、数据的格式或类似的东西),你可以查看 WriteToFiles 中的额外参数。 .

你可以看到一个例子 here在 Beam 测试中使用的转换,具有更复杂的参数 - 但听起来默认行为对您来说应该足够了。

关于google-cloud-storage - 在 Apache Beam/Dataflow Python 流中写入文本文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53379585/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com