gpt4 book ai didi

python - 使用 Apache Beam Python `WriteToFiles` 转换为每个窗口只写入一个文件

转载 作者:行者123 更新时间:2023-12-02 02:37:18 24 4
gpt4 key购买 nike

需要一些帮助。我有一些琐碎的任务从 Pub/Sub 读取并写入 GCS 中的批处理文件,但在 fileio.WriteToFiles 方面遇到了一些麻烦

with beam.Pipeline(options=pipeline_options) as p:
input = (p | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'Parse' >> beam.Map(parse_json)
| ' data w' >> beam.WindowInto(
FixedWindows(60),
accumulation_mode=AccumulationMode.DISCARDING
))

event_data = (input
| 'filter events' >> beam.Filter(lambda x: x['t'] == 'event')
| 'encode et' >> beam.Map(lambda x: json.dumps(x))
| 'write events to file' >> fileio.WriteToFiles(
path='gs://extention/ga_analytics/events/', shards=0))

我的窗口触发后需要一个文件,但文件数量等于来自 Pubsub 的消息数量,有人可以帮助我吗? current output files但我只需要一个文件。

最佳答案

我最近遇到了这个问题并深入研究了源代码:

fileio.WriteToFiles 将尝试将包中的每个元素作为单独的文件输出。仅当元素数量超过写入器数量时,WireToFiles 才会回退到写入分片文件。

要强制为窗口中的所有元素创建单个文件,请将 max_writers_per_bundle 设置为 0

WriteToFiles(shards=1, max_writers_per_bundle=0)

关于python - 使用 Apache Beam Python `WriteToFiles` 转换为每个窗口只写入一个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64117158/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com