gpt4 book ai didi

python - 数据流中带有时间戳的处理字段

转载 作者:行者123 更新时间:2023-12-01 08:09:24 24 4
gpt4 key购买 nike

我收到来自 Google Cloud Pub/Sub 的消息,格式如下:

{u'date': u'2019-03-26T09:57:52Z', 'field1': value1, u'field2': u'value2', u'field3': u'value3', u'field4': u'value4',...}

我希望当这些消息在带有窗口的管道中处理时:

| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 10))

“日期”字段将被处理为窗口的引用时间戳。

我需要自定义 WindowFn 还是应该如何做?

最佳答案

您需要像这样指定自定义时间戳:

def custom_timestamp(message):
# assuming that message is already parsed JSON (dict)
import datetime as dt
import apache_beam as beam
ts = dt.datetime.strptime(message["date"], "%Y-%m-%dT%H:%M:%SZ")
return beam.window.TimestampedValue(message, ts.timestamp())

然后:

| 'CustomTimestamp' >> beam.Map(custom_timestamp)
| 'Window' >> beam.WindowInto(window.FixedWindows(1 * 10))

您可以在这里找到完整的详细信息:https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

但是,您必须注意,适用于 Apache Beam 的 Streaming Python SDK 有很多缺失的部分,并且有些功能无法按您的预期工作。我想要实现与您完全相同的情况,并且在添加自定义时间戳后,DataFlow Runner 由于所谓的“droppedDueToLateness”而删除了我的消息。我仍然不确定是否可以设置系统水印来使用 PubSub 和 Python 处理历史数据。

关于python - 数据流中带有时间戳的处理字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55354564/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com