gpt4 book ai didi

python - 如何在 Apache Beam (Python) 中通过键在静态查找表上以流模式加入 PCollection

转载 作者:行者123 更新时间:2023-11-28 18:56:27 29 4
gpt4 key购买 nike

我正在将(无限制的)数据从 Google Cloud Pubsub 以字典的形式流式传输到 PCollection 中。随着流式数据的到来,我想通过在静态(有界)查找表上按键加入它来丰富它。这张表足够小,可以保存在内存中。

我目前有一个使用 DirectRunner 运行的有效解决方案,但是当我尝试在 DataflowRunner 上运行它时,出现错误。

我使用 beam.io.ReadFromText 函数从 csv 中读取了有界查找表,并将值解析为字典。然后我创建了一个 ParDo 函数,它将我的无界 PCollection 和查找字典作为辅助输入。在 ParDo 中,它使用生成器在查找表的正确行上“加入”,并将丰富输入元素。

这里是一些主要部分..


# Get bounded lookup table
lookup_dict = (pcoll | 'Read PS Table' >> beam.io.ReadFromText(...)
| 'Split CSV to Dict' >> beam.ParDo(SplitCSVtoDict()))

# Use lookup table as side input in ParDo func to enrich unbounded pcoll
# I found that it only worked on my local machine when decorating it with AsList
enriched = pcoll | 'join pcoll on lkup' >> beam.ParDo(JoinLkupData(), data=beam.pvalue.AsList(lookup_dict)

class JoinLkupData(beam.DoFn):
def process(self, element, lookup_data):
# I used a generator here
lkup = next((row for row in lookup_data if row[<JOIN_FIELD>]) == element[<JOIN_FIELD>]), None)

if lkup:
# If there is a join, add new fields to the pcoll
element['field1'] = lkup['field1']
element['field2'] = lkup['field2']
yield element

使用 DirectRunner 在本地运行时我能够获得正确的结果,但是在 DataFlow Runner 上运行时,我收到此错误:

apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:数据流管道失败。状态:失败,错误:
工作流失败。原因:预期自定义源具有非零数量的拆分。

这篇文章:“Error while splitting pcollections on Dataflow runner”让我想到这个错误的原因与多个工作人员在拆分工作时无法访问同一个查找表有关。

最佳答案

以后,如果可以的话,请分享 Beam 的版本和堆栈跟踪。

在这种情况下,错误消息不是很好是一个已知问题。在撰写本文时,用于 Python 流式传输的 Dataflow 仅限于用于读写的 Pubsub 和用于写入的 BigQuery。在管道中使用文本源会导致此错误。

关于python - 如何在 Apache Beam (Python) 中通过键在静态查找表上以流模式加入 PCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57797235/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com