gpt4 book ai didi

python - Beam/Google Cloud 数据流 ReadFromPubsub 缺失数据

转载 作者:行者123 更新时间:2023-12-01 06:47:51 24 4
gpt4 key购买 nike

我有 2 个数据流流管道(pubsub 到 bigquery),代码如下:

class transform_class(beam.DoFn):

def process(self, element, publish_time=beam.DoFn.TimestampParam, *args, **kwargs):
logging.info(element)
yield element

class identify_and_transform_tables(beam.DoFn):
#Adding Publish Timestamp
#Since I'm reading from a topic that consist data from multiple tables,
#function here is to identify the tables and split them apart


def run(pipeline_args=None):
# `save_main_session` is set to true because some DoFn's rely on
# globally imported modules.
pipeline_options = PipelineOptions(
pipeline_args, streaming=True, save_main_session=True)

with beam.Pipeline(options=pipeline_options) as pipeline:
lines = (pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic='topic name',with_attributes = True)
| 'Transforming Messages' >> beam.ParDo(transform_class())
| 'Identify Tables' >> beam.ParDo(identify_and_transform_tables()).with_outputs('table_name'))

table_name = lines.table_name
table_name = (table_name
| 'Write table_name to BQ' >> beam.io.WriteToBigQuery(
table='table_name',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

result = pipeline.run()

这两个管道都读取同一 pubsub 主题。在协调时,我发现一些数据丢失了,并且两个管道的丢失数据不同。例如,

第 56-62 行在管道 1 中缺失,但在管道 2 中存在
管道 2 中缺少第 90-95 行,但管道 1 中存在第 90-95 行

因此,这意味着数据存在于 pubsub 主题中。
正如您在代码中看到的,第一个功能是将 pubsub 消息直接记录到 stackdriver 中。除了 bigquery 之外,我还仔细检查了 stackdriver 日志中是否有缺失的数据。

我发现的另一件事是,这些丢失的数据是在一段时间内发生的。例子,第 56-62 行的时间戳为“2019-12-03 05:52:18.754150 UTC”并且接近该时间戳(精确到毫秒)

因此,我唯一的结论是数据流 readfrompubsub 有时会丢失数据?
非常感谢任何帮助。

最佳答案

我不确定这种情况下发生了什么,但这是防止数据丢失的重要规则:

  • 不要从主题中读取内容,如 beam.io.ReadFromPubSub(topic='topic name')
  • 从订阅中读取数据,如 beam.io.ReadFromPubSub(subscription='subscription name')

这是因为在重新启动的情况下,将在第一种情况下创建一个新的订阅 - 并且该订阅可能只包含创建后收到的数据。如果您事先创建订阅,数据将保留在其中,直到被读取(或过期)。

关于python - Beam/Google Cloud 数据流 ReadFromPubsub 缺失数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59150110/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com