gpt4 book ai didi

google-cloud-dataflow - Apache Beam 管道中的组元素

转载 作者:行者123 更新时间:2023-12-03 23:56:37 25 4
gpt4 key购买 nike

我有一个从 AVRO 文件解析记录的管道。

我需要将传入的记录拆分为 500 个项目的块,以便调用同时接受多个输入的 API。

有没有办法用 Python SDK 做到这一点?

最佳答案

我假设您指的是 Batch 用例。为此,您有几个选择:

如果您的 PCollection 足够大,并且您对捆绑包的大小有一定的灵活性,则可以使用 GroupByKey以随机/循环顺序为元素分配键后进行转换。例如。:

my_collection = p | ReadRecordsFromAvro()

element_bundles = (my_collection
# Choose a number of keys that works for you (I chose 50 here)
| 'AddKeys' >> beam.Map(lambda x: (randint(0, 50), x))
| 'MakeBundles' >> beam.GroupByKey()
| 'DropKeys' >> beam.Map(lambda (k, bundle): bundle)
| beam.ParDo(ProcessBundlesDoFn()))

哪里 ProcessBundlesDoFn是这样的:
class ProcessBundlesDoFn(beam.DoFn):
def process(self, bundle):
while bundle.has_next():
# Fetch in batches of 500 until you're done
result = fetch_n_elements(bundle, 500)
yield result

如果您 需要要拥有恰好 500 个元素的所有捆绑包,那么您可能需要:
  • 计算 PCollection 中的元素数量
  • 将该计数作为单例侧输入传递给您的 'AddKeys' ParDo,以确定您将需要的确切 key 数量。

  • 希望有帮助。

    关于google-cloud-dataflow - Apache Beam 管道中的组元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45392392/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com