gpt4 book ai didi

python - 慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK

转载 作者:行者123 更新时间:2023-12-02 09:38:13 25 4
gpt4 key购买 nike

我正在尝试使用 Python SDK for Apache Beam on DataFlow 来遵循流式传输管道的缓慢更改查找缓存 ( https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 ) 的设计模式。

我们的查找缓存引用表位于 BigQuery 中,我们可以读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。

class FilterAlertDoFn(beam.DoFn):
def process(self, element, alertlist):

print len(alertlist)
print alertlist

… # function logic
<小时/>
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
| ‘alert_side_input’ >> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
late=trigger.AfterCount(1)
)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| beam.Map(lambda elem: elem[‘SOMEKEY’])
)

...


main_input | ‘alerts’ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))
<小时/>

根据此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源,因此无法在此方法中刷新?

尝试在源上设置非全局窗口会导致侧面输入中出现空的 PCollection。

<小时/>

更新:当尝试实现 Pablo 的答案建议的策略时,使用侧面输入的 ParDo 操作将不会运行。

有一个输入源可连接到两个输出,其中一个使用侧面输入。 Non-SideInput 仍将到达其目的地,并且 SideInput 管道不会进入 FilterAlertDoFn()。

通过用虚拟值替换侧面输入,管道将进入该函数。它是否可能在等待不存在的合适窗口?

使用与上面相同的 FilterAlertDoFn(),我的 side_input 和调用现在如下所示:

def refresh_side_input(_):
query = 'select col from table'
client = bigquery.Client(project='gcp-project')
query_job = client.query(query)

return query_job.result()


trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
subscription=known_args.trigger_subscription))


bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
| beam.WindowInto(beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(refresh_side_input)
))

...

# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)

# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])

我尝试了几个不同版本的refresh_side_input(),它们在检查函数内部的返回值时报告了预期的结果。

<小时/>

更新2:

我对 Pablo 的代码做了一些小修改,并且得到了相同的行为 - DoFn 永远不会执行。

在下面的示例中,每当我发布到 some_other_topic 时,我都会看到“in_load_conversion_data”,但在发布到 some_topic 时永远不会看到“in_DoFn”

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def load_my_conversion_data():
return {'EURUSD': 1.1, 'USDMXN': 4.4}


def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {'EURUSD': 1.1, 'USDMXN' 20,}
print 'in_load_conversion_data'
return load_my_conversion_data()


class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currency = target_currency

def process(self, elm, rates):
print 'in_DoFn'
elm = elm.attributes
if elm['currency'] == self.target_currency:
yield elm
elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
result = {}.update(elm).update({'currency': self.target_currency,
'value': elm['value']*rate})
yield result
else:
return # We drop that value


pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)

some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'

with beam.Pipeline(options=pipeline_options) as p:

table_pcv = beam.pvalue.AsSingleton((
p
| 'some_other_topic' >> beam.io.ReadFromPubSub(topic=some_other_topic, with_attributes=True)
| 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))


_ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
| 'some_window' >> beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo('USD'), rates=table_pcv))

最佳答案

正如您所指出的,Java SDK 允许您使用更多流式实用程序,例如计时器和状态。这些实用程序有助于实现此类管道。

Python SDK 缺少其中一些实用程序,特别是计时器。因此,我们需要使用一种 hack,通过将消息插入 PubSub 中的 some_other_topic 来触发侧面输入的重新加载。

这也意味着您必须手动执行 BigQuery 查找。您可以使用 apache_beam.io.gcp.bigquery_tools.BigQueryWrapper 类直接在 BigQuery 中执行查找。

以下是刷新某些货币换算数据的管道示例。我还没有测试过它,但我 90% 确信它只需进行少量调整即可工作。让我知道这是否有帮助。

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {‘EURUSD’: 1.1, ‘USDMXN’ 20, …}
return external_service.load_my_conversion_data()

table_pcv = beam.pvalue.AsSingleton((
p
| beam.io.gcp.ReadFromPubSub(topic=some_other_topic)
| WindowInto(window.GlobalWindow(),
trigger=trigger.Repeatedly(trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))


class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currenct = target_currency

def process(self, elm, rates):
if elm[‘currency’] == self.target_currency:
yield elm
elif ‘%s%s’ % (elm[‘currency’], self.target_currency) in rates:
rate = rates[‘%s%s’ % (elm[‘currency’], self.target_currency)]
result = {}.update(elm).update({‘currency’: self.target_currency,
‘value’: elm[‘value’]*rate})
yield result
else:
return # We drop that value


_ = (p
| beam.io.gcp.ReadFromPubSub(topic=some_topic)
| beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo(‘USD’), rates=table_pcv))

关于python - 慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55055026/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com