gpt4 book ai didi

java - 如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple

转载 作者:太空宇宙 更新时间:2023-11-04 09:03:35 24 4
gpt4 key购买 nike

我正在尝试将 PTransform 应用于 PCollectionTuple,但无法弄清楚编译器为什么会提示。

我想这样做是为了将连接某些 csv 行所需的多个步骤抽象为单个 PTransform(PCollectionTuple 中的每个 PCollection 包含要连接的 csv 行),我遇到的问题不在于连接本身,而是如何将 PTransform 应用于 PCollectionTuple。

这是我的代码:

static class JoinCsvLines extends DoFn<PCollectionTuple, String[]> {
@ProcessElement
public void processElement(ProcessContext context) {
PCollectionTuple element = context.element();
// TODO: Implement the output
}
}

我这样称呼 PTransform:

TupleTag<String[]> tag1 = new TupleTag<>();
TupleTag<String[]> tag2 = new TupleTag<>();
PCollectionTuple toJoin = PCollectionTuple.of(tag1, csvLines1).and(tag2, csvLines2);

// Can't compile this line
PCollection<String[]> joinedLines = toJoin.apply("JoinLines", ParDo.of(new JoinCsvLines()));

当我将鼠标悬停在未编译的行上方时,IntelliJ IDEA 会输出以下内容:

Required type:
PTransform
<? super PCollectionTuple,
OutputT>
Provided:
SingleOutput
<PCollectionTuple,
String[]>
reason: no instance(s) of type variable(s) InputT exist so that PCollectionTuple conforms to PCollection<? extends InputT>

如何将 PTransform 应用于 PCollectionTuple?

最佳答案

DoFn<PCollectionTuple, String[]>意味着您想要为每个记录应用“DoFn”,因此您不应该使用 PCollectionTuple 作为输入类型。相反,您应该使用“csvLines1”和“csvLines2”的类型。

如果您的目的是合并两个 PCollection,您可以选中 Flatten 变换:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java#L41

关于java - 如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60491804/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com