gpt4 book ai didi

python-2.7 - 您如何在 Python 中测试 Beam 管道(Google Dataflow)?

转载 作者:行者123 更新时间:2023-12-04 20:30:29 24 4
gpt4 key购买 nike

我正在了解我们应该如何使用 Google DataFlow(基于 Apache Beam)Python SDK 来测试我们的管道。

https://beam.apache.org/documentation/pipelines/test-your-pipeline/
https://cloud.google.com/dataflow/pipelines/creating-a-pipeline-beam

以上链接仅适用于 Java。我很困惑为什么 Google 会指向 Java Apache 测试。

我希望能够在两个 p 集合上查看 CoGroupByKey 连接的结果。我来自 Python 背景,几乎没有使用 Beam/Dataflow 的经验。

真的可以使用任何帮助。我知道这在某种程度上是开放式的......基本上我需要能够在我的管道中查看结果,它阻止我看到我的 CoGroupByKey Join 的结果。

下面的代码

    #dwsku, product are PCollections coming from BigQuery. Nested Values as 
#well in Product, but not dwsku
d1 = {'dwsku': dwsku, 'product': product}
results = d1 | beam.CoGroupByKey()
print results

打印的内容:
    PCollection[CoGroupByKey/Map(_merge_tagged_vals_under_key).None]

最佳答案

如果你想在你的机器上本地测试它,你应该从使用 DirectRunner 开始。然后您将能够调试它 - 通过打印日志或通过在调试器中停止执行。

为了在本地查看整个 PCollection,您可以执行以下操作:

d1 = {'dwsku': dwsku, 'product': product}
results = d1 | beam.CoGroupByKey()

def my_debug_function(pcollection_as_list):
# add a breakpoint in this function or just print
print pcollection_as_list

debug = (results | beam.combiners.ToList() | beam.Map(my_debug_function))

这里有几件事要记住:
  • ToList()转换可能会分配大量内存
  • 使用时 DirectRunner你应该使用 .wait_until_finish()管道的方法,以便您的脚本不会在管道完成执行之前结束
  • 如果您的管道从 BigQuery 下载数据,您应该输入 LIMIT在本地运行时查询
  • 关于python-2.7 - 您如何在 Python 中测试 Beam 管道(Google Dataflow)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47418694/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com