gpt4 book ai didi

google-cloud-dataflow - 如何使用 Apache Beam Python SDK 在给定 key 的两个源上执行 "diff"?

转载 作者:行者123 更新时间:2023-12-04 03:10:19 27 4
gpt4 key购买 nike

我笼统地提出了这个问题,因为也许这是一个笼统的答案。但是一个具体的例子是比较 2 个具有相同架构但数据可能不同的 BigQuery 表。我想要一个差异,即相对于复合键添加、删除、修改的内容,例如前 2 列。

Table A
C1 C2 C3
-----------
a a 1
a b 1
a c 1

Table B
C1 C2 C3 # Notes if comparing B to A
-------------------------------------
a a 1 # No Change to the key a + a
a b 2 # Key a + b Changed from 1 to 2
# Deleted key a + c with value 1
a d 1 # Added key a + d

我基本上希望能够制作/报告比较说明。
或者从 Beam 的角度来看,我可能只想输出最多 4 个标记的 PCollections:未更改、已更改、已添加、已删除。我该怎么做,PCollections 会是什么样子?

最佳答案

您在这里要做的基本上是连接两个表并比较结果,对吗?你可以看看my answer to this question , 以查看连接两个表的两种方式(侧输入或 CoGroupByKey)。

我还将使用 CoGroupByKey 为您的问题编写解决方案.我用 Python 编写代码是因为我更熟悉 Python SDK,但您会用 Java 实现类似的逻辑:

def make_kv_pair(x):
""" Output the record with the x[0]+x[1] key added."""
return ((x[0], x[1]), x)

table_a = (p | 'ReadTableA' >> beam.Read(beam.io.BigQuerySource(....))
| 'SetKeysA' >> beam.Map(make_kv_pair)
table_b = (p | 'ReadTableB' >> beam.Read(beam.io.BigQuerySource(....))
| 'SetKeysB' >> beam.Map(make_kv_pair))

joined_tables = ({'table_a': table_a, 'table_b': table_b}
| beam.CoGroupByKey())


output_types = ['changed', 'added', 'deleted', 'unchanged']
class FilterDoFn(beam.DoFn):
def process((key, values)):
table_a_value = list(values['table_a'])
table_b_value = list(values['table_b'])
if table_a_value == table_b_value:
yield pvalue.TaggedOutput('unchanged', key)
elif len(table_a_value) < len(table_b_value):
yield pvalue.TaggedOutput('added', key)
elif len(table_a_value) > len(table_b_value):
yield pvalue.TaggedOutput('removed', key)
elif table_a_value != table_b_value:
yield pvalue.TaggedOutput('changed', key)

key_collections = (joined_tables
| beam.ParDo(FilterDoFn()).with_outputs(*output_types))

# Now you can handle each output
key_collections.unchanged | WriteToText(...)
key_collections.changed | WriteToText(...)
key_collections.added | WriteToText(...)
key_collections.removed | WriteToText(...)

关于google-cloud-dataflow - 如何使用 Apache Beam Python SDK 在给定 key 的两个源上执行 "diff"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45873830/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com