gpt4 book ai didi

python - WriteToText 仅写入临时文件

转载 作者:太空宇宙 更新时间:2023-11-03 20:39:13 25 4
gpt4 key购买 nike

我是 Apache Beam 的新手,并尝试用 Python 编写我的第一个管道,以将 Google Pub/Sub 订阅中的数据输出到平面文件以供以后使用;理想情况下,我想每半小时将这些批处理成一个文件。我有以下代码作为管道中的最终转换:-

| 'write output' >> WriteToText('TestNewPipeline.txt')

但是,创建的所有文件都位于前缀为“beam-temp-TestNewPipeline.txt-[somehash]”的目录中,并批量分成 10 个组,这不是我所期望的。

我尝试过使用窗口函数,但它似乎没有太大效果,所以要么我完全误解了这个概念,要么做了完全错误的事情。

窗口的代码是:-

 | 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))

我认为这会导致文本文件的输出被写入静态的五秒窗口中,但事实并非如此。

完整代码如下:-

options = PipelineOptions()
options.view_as(StandardOptions).streaming=True

def format_message(message, timestamp=beam.DoFn.TimestampParam):
formatted_message = {
'data': message.data,
'attributes': str(message.attributes),
'timestamp': float(timestamp)
}

return formatted_message

with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription='projects/[my proj]/subscriptions/[my subscription]',with_attributes=True)
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5))
| 'Map Message' >> beam.Map(format_message)
| 'write output' >> WriteToText('TestNewPipeline.txt')
)
result = p.run()

正如预期的那样,该进程无限期地运行并成功从订阅中读取消息;然而它只将它们写入到beam-temp 文件中。有谁能帮我指出我错在哪里吗?

更新:

根据 Jason 的评论,我对管道进行了更多修改:-

class AddKeyToDict(beam.DoFn):
def process(self, element):
return [(element['rownumber'], element)]

with beam.Pipeline(options=options) as p:
(p
| 'Read From Pub Sub' >> ReadFromPubSub(subscription=known_args.input_subscription)# can't make attributes work as yet! ,with_attributes=True)
# failed attempt 1| 'Map Message' >> beam.Map(format_message)
# failed attempt 2| 'Parse JSON' >> beam.Map(format_message_element)
| 'Parse to Json' >> beam.Map(lambda x: json.loads(x))
| 'Add key' >> beam.ParDo(AddKeyToDict())
| 'Window' >> beam.WindowInto(beam.window.FixedWindows(5), trigger=AfterProcessingTime(15), accumulation_mode=AccumulationMode.DISCARDING)
| 'Group' >> beam.GroupByKey()
| 'write output' >> WriteToText(known_args.output_file)
)

我还无法从 PubSub 中提取 message_id 或发布时间,因此我仅使用消息中生成的行号。此时,我仍然只创建了临时文件,而没有积累任何内容到最终文件中?开始怀疑 Python 实现是否仍然有点缺乏,我将不得不选择 Java......

最佳答案

来自Apache Beam's documentation on Windowing Constraints :

If you set a windowing function using the Window transform, each element is assigned to a window, but the windows are not considered until GroupByKey or Combine aggregates across a window and key.

由于此示例中似乎没有键的概念,您可以尝试使用Combine吗?

关于python - WriteToText 仅写入临时文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56960622/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com