gpt4 book ai didi

python - Apache Beam/Dataflow 作业中是否可以有非并行步骤?

转载 作者:太空宇宙 更新时间:2023-11-03 20:29:57 24 4
gpt4 key购买 nike

假设我在 GCP 中有一个 python 数据流作业,它执行以下两件事:

  • 从 BigQuery 获取一些数据

  • 调用外部 API 以获取特定值,并根据获取的值过滤来自 BigQuery 的数据

我能够做到这一点,但是对于第二步,我想出如何实现它的唯一方法是将其作为扩展 DoFn 的类,并稍后以并行方式调用它:

class CallExternalServiceAndFilter(beam.DoFn):
def to_runner_api_parameter(self, unused_context):
pass

def process(self, element, **kwargs):
# here I have to make the http call and figure out whether to yield the element or not,
# however this happens for each element of the set, as expected.
if element['property'] < response_body_parsed['some_other_property']:
logging.info("Yielding element")
yield element
else:
logging.info("Not yielding element")
with beam.Pipeline(options=PipelineOptions(), argv=argv) as p:
rows = p | 'Read data' >> beam.io.Read(beam.io.BigQuerySource(
dataset='test',
project=PROJECT,
query='Select * from test.table'
))

rows = rows | 'Calling external service and filtering items' >> beam.ParDo(CallExternalServiceAndFilter())

# ...

有什么方法可以让 API 调用一次,然后在并行过滤步骤中使用结果吗?

最佳答案

使用__init__函数。

class CallExternalServiceAndFilter(beam.DoFn):
def __init__():
self.response_body_parsed = call_api()

def to_runner_api_parameter(self, unused_context):
pass

def process(self, element, **kwargs):
# here I have to make the http call and figure out whether to yield the element or not,
# however this happens for each element of the set, as expected.
if element['property'] < self.response_body_parsed['some_other_property']:
logging.info("Yielding element")
yield element
else:
logging.info("Not yielding element")

或者更好的是,只需预先调用您的 API(在构建管道的本地计算机上),并在 __init__ 中分配值。

reponse_body_parsed = call_api()

class CallExternalServiceAndFilter(beam.DoFn):
def __init__():
self.response_body_parsed = reponse_body_parsed

def to_runner_api_parameter(self, unused_context):
pass

def process(self, element, **kwargs):
# here I have to make the http call and figure out whether to yield the element or not,
# however this happens for each element of the set, as expected.
if element['property'] < self.response_body_parsed['some_other_property']:
logging.info("Yielding element")
yield element
else:
logging.info("Not yielding element")

您说过使用setup仍然会进行多次调用。 __init__ 仍然是这种情况吗(如果您在 DoFn 中执行 API 调用,而不是事先执行)?我仍然不清楚 __init__ 和 setup 之间的区别。

关于python - Apache Beam/Dataflow 作业中是否可以有非并行步骤?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57568242/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com