gpt4 book ai didi

apache-beam - 非 KV 元素的 GroupIntoBatches

转载 作者:行者123 更新时间:2023-12-05 01:18:34 26 4
gpt4 key购买 nike

根据Apache Beam 2.0.0 SDK Documentation GroupIntoBatches 仅适用于 KV 集合。

我的数据集只包含值,不需要引入键。但是,要使用 GroupIntoBatches,我必须使用空字符串作为键来实现“假”键:

static class FakeKVFn extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}

所以整个流水线如下所示:

public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

long batchSize = 100L;

p.apply("ReadLines", TextIO.read().from("./input.txt"))
.apply("FakeKV", ParDo.of(new FakeKVFn()))
.apply(GroupIntoBatches.<String, String>ofSize(batchSize))
.setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(callWebService(c.element().getValue()));
}
}))
.apply("WriteResults", TextIO.write().to("./output/"));

p.run().waitUntilFinish();
}

有没有办法在不引入“假” key 的情况下进行分组?

最佳答案

需要向 GroupIntoBatches 提供 KV 输入,因为转换是使用状态和计时器实现的,它们是每个键和窗口。

对于每个键+窗口对,状态和计时器必须连续执行(或明显如此)。您必须通过提供键(和窗口,尽管我知道今天没有运行器在窗口上并行化)来手动表达可用的并行性。两种最常见的方法是:

  1. 使用一些自然键,比如用户 ID
  2. 随机选择一些固定数量的分片和 key 。这可能更难调整。您必须有足够多的分片才能获得足够的并行性,但每个分片都需要包含足够的数据,GroupIntoBatches 才是真正有用的。

在您的代码段中向所有元素添加一个虚拟键将导致转换根本无法并行执行。这类似于 Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner 的讨论。 .

关于apache-beam - 非 KV 元素的 GroupIntoBatches,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44882253/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com