gpt4 book ai didi

python-3.x - 如何在 Python 中将表行 PCollections 转换为键、值 PCollections?

转载 作者:行者123 更新时间:2023-12-04 23:38:06 26 4
gpt4 key购买 nike

没有关于如何将 pCollections 转换为输入到 .CoGroupByKey() 所需的 pCollections 的文档

语境
基本上我有两个大的 pCollections,我需要能够找到两者之间的差异,对于类型 II ETL 更改(如果它不存在于 pColl1 中,则添加到 pColl2 中找到的嵌套字段),以便我能够从 BigQuery 保留这些记录的历史记录。

管道架构:

  • 将 BQ 表读入 2 个 pCollections:dwsku 和 product。
  • 对两个集合应用 CoGroupByKey() 返回 --> 结果
  • 解析结果以查找 dwsku 中的所有更改并将其嵌套到产品中。

  • 任何帮助都会被推荐。我在 SO 上找到了一个 java 链接,它执行我需要完成的相同操作(但 Python SDK 上没有任何内容)。

    Convert from PCollection<TableRow> to PCollection<KV<K,V>>

    是否有 Apache Beam 的文档/支持,尤其是 Python SDK?

    最佳答案

    为了得到CoGroupByKey()工作,你需要有 PCollectionstuples ,其中第一个元素是 key 第二个 - 数据 .

    在你的情况下,你说你有 BigQuerySource , 在当前版本的 Apache Beam 中输出 PCollection of dictionaries ( code ),其中每个条目代表表中被读取的一行。如上所述,您需要将此 PCollections 映射到元组。使用 ParDo 很容易做到这一点:

    class MapBigQueryRow(beam.DoFn):
    def process(self, element, key_column):
    key = element.get(key_column)
    yield key, element


    data1 = (p
    | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
    | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))

    data2 = (p
    | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
    | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))

    co_grouped = ({"data1": data1, "data2": data2} | beam.CoGroupByKey())

    # do your processing with co_grouped here

    顺便说一句,可以找到适用于 Apache Beam 的 Python SDK 文档 here .

    关于python-3.x - 如何在 Python 中将表行 PCollections 转换为键、值 PCollections?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47582246/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com