gpt4 book ai didi

python - 数据流 : Using Top module with Python SDK: single-element PCollection

转载 作者:太空宇宙 更新时间:2023-11-04 05:20:52 25 4
gpt4 key购买 nike

我正在查看 incubator-beam 存储库中的 word_counting.py 示例(从 Dataflow 文档链接),我想修改它以获得出现次数最多的 n。这是我的管道:

  counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

我使用 Top.Of() 方法添加了一行,但它似乎返回了一个 PCollection 并将数组作为单个元素(我正在等待一个有序的 PCollection 但看起来似乎是在看文档PCollections 是无序集合。

当管道运行时,beam.Map 仅遍历一个元素(即整个数组),并且在“格式”中,lambda 函数会引发错误,因为它无法将整个数组映射到元组 (word,c )

我应该如何在不中断管道的情况下处理这个单元素 PCollection?

最佳答案

如果你想将可迭代的PCollection扩展为这些可迭代的元素的PCollection,你可以使用FlatMap,其参数是从元素到可迭代结果的函数:在您的情况下,元素本身是可迭代的,因此我们使用标识函数。

  counts = ...
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
| 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
...

关于python - 数据流 : Using Top module with Python SDK: single-element PCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40368238/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com