gpt4 book ai didi

Python Apache Beam 管道状态 API 调用

转载 作者:行者123 更新时间:2023-11-28 16:20:24 24 4
gpt4 key购买 nike

我们目前有一个 Python Apache Beam 管道可以在本地运行。我们现在正在让管道在 Google Cloud Dataflow 上运行并实现完全自动化,但发现 Dataflow/Apache Beam 的管道监控存在局限性。

目前,Cloud Dataflow有两种监控管道状态的方法,一种是通过其 UI 界面,另一种是通过命令行中的 gcloud。这两种解决方案都不适用于我们可以考虑无损文件处理的全自动解决方案。

查看 Apache Beam 的 github,他们有一个文件,internal/apiclient.py这表明有一个用于获取作业状态的函数 get_job

我们发现 get_job 使用的一个实例在 runners/dataflow_runner.py 中.

最终目标是使用此 API 获取我们自动触发运行的一个或多个作业的状态,以确保它们最终都通过管道成功处理。

任何人都可以向我们解释一下在我们运行管道 (p.run()) 之后如何使用这个 API 吗?我们不明白 response = runner.dataflow_client.get_job(job_id) 中的 runner 来自哪里。

如果有人可以更深入地了解我们如何在设置/运行我们的管道时访问此 API 调用,那就太好了!

最佳答案

我最终只是摆弄了代码并找到了获取工作详细信息的方法。我们的下一步是看看是否有办法获得所有工作的列表。

# start the pipeline process
pipeline = p.run()
# get the job_id for the current pipeline and store it somewhere
job_id = pipeline.job_id()
# setup a job_version variable (either batch or streaming)
job_version = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
# setup "runner" which is just a dictionary, I call it local
local = {}
# create a dataflow_client
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version)
# get the job details from the dataflow_client
print local['dataflow_client'].get_job(job_id)

关于Python Apache Beam 管道状态 API 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40731842/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com