gpt4 book ai didi

python - 如何在 python apache beam 中展平多个 Pcollection

转载 作者:行者123 更新时间:2023-11-28 17:59:55 25 4
gpt4 key购买 nike

应该如何实现位于 https://beam.apache.org/documentation/pipelines/design-your-pipeline/ 的以下逻辑:

//merge the two PCollections with Flatten//me 
PCollectionList<String> collectionList = PCollectionList.of(aCollection).and(bCollection);
PCollection<String> mergedCollectionWithFlatten = collectionList
.apply(Flatten.<String>pCollections());

// continue with the new merged PCollection
mergedCollectionWithFlatten.apply(...);

由此多个 PCollection 可以组合成一个 PCollection在 apache beam python api 中?

最佳答案

您可以使用 Flatten改造也。例如:

data1 = ['one', 'two', 'three']
data2 = ['four','five']

input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)

merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

合并后的 PCollection 将包含:

INFO:root:one
INFO:root:two
INFO:root:three
INFO:root:four
INFO:root:five

完整代码:

import argparse, logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class LogFn(beam.DoFn):
"""Prints information"""
def process(self, element):
logging.info(element)
return element


def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

data1 = ['one', 'two', 'three']
data2 = ['four','five']

input1 = p | 'Create PCollection1' >> beam.Create(data1)
input2 = p | 'Create PCollection2' >> beam.Create(data2)

merged = ((input1,input2) | 'Merge PCollections' >> beam.Flatten())

merged | 'Check Results' >> beam.ParDo(LogFn())

result = p.run()
result.wait_until_finish()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

关于python - 如何在 python apache beam 中展平多个 Pcollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56174629/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com