gpt4 book ai didi

google-cloud-dataflow - 如何重新洗牌 PCollection

转载 作者:行者123 更新时间:2023-12-05 01:36:42 25 4
gpt4 key购买 nike

我正在尝试实现 Reshuffle 转换以防止 excessive fusion ,但我不知道如何更改 <KV<String,String>> 的版本以处理简单的 PCollections。 (如何改组 PCollection <KV<String,String>> 描述为 here 。)

在我的管道中添加更多步骤之前,我将如何扩展官方 Avro I/O example code 以重新洗牌?

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Schema schema = new Schema.Parser().parse(new File("schema.avsc"));

PCollection<GenericRecord> records =
p.apply(AvroIO.Read.named("ReadFromAvro")
.from("gs://my_bucket/path/records-*.avro")
.withSchema(schema));

最佳答案

感谢谷歌支持团队提供的代码片段,我想通了:

要获得重新洗牌的 PCollection:

PCollection<T> reshuffled = data.apply(Repartition.of());

使用的 Repartition 类:
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;

public class Repartition<T> extends PTransform<PCollection<T>, PCollection<T>> {

private Repartition() {}

public static <T> Repartition<T> of() {
return new Repartition<T>();
}

@Override
public PCollection<T> apply(PCollection<T> input) {
return input
.apply(ParDo.named("Add arbitrary keys").of(new AddArbitraryKey<T>()))
.apply(GroupByKey.<Integer, T>create())
.apply(ParDo.named("Remove arbitrary keys").of(new RemoveArbitraryKey<T>()));
}

private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}

private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}
}

关于google-cloud-dataflow - 如何重新洗牌 PCollection<T>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40767189/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com