gpt4 book ai didi

java - 在 Dataflow Generic 中进行转换

转载 作者:行者123 更新时间:2023-11-30 06:58:32 25 4
gpt4 key购买 nike

这与 [这里] ( Setting Custom Coders & Handling Parameterized types ) 的另一个 SO 问题有关,那里的解决方法帮助我在转换中使用自定义类型。但是由于我的自定义类型是通用的,我希望甚至可以使转换类通用,然后可以用相同的类型参数化自定义类型。但是当我尝试这样做时,我遇到了无法为类型变量 T 提供编码器,因为实际类型由于删除而未知。解决方法建议注册一个将返回类型参数的编码器,但由于类型参数本身是未知的,我猜想抛出了这个异常,我不确定如何解决这个问题。

static class Processor<T> 
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {

private static final long serialVersionUID = 0;

@Override public PCollection<KV<String, Set<CustomType<T>>>>
apply(PCollection<String> items) {
PCollection<KV<String, Set<CustomType<T>>>> partitionedItems = items
.apply(ParDo.of(new ParDoFn()));
PCollection<KV<String, Set<CustomType<T>>>> combinedItems = partitionedItems
.apply(Combine.<String, Set<CustomType<T>>>perKey(new Merger()));
}
}

最佳答案

这看起来也是由 Github Issue #57 引起的并且应该与该问题一起解决。

与此同时,Dataflow 实际上包含可以立即解决您的问题的高级功能。从您的代码片段看来,所讨论的整个系统可能看起来像这样:

class CustomType<T extends Serializable> { ... }

class Processor<T extends Serializable>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {

class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> { … }

class Merger extends BinaryCombineFn<Set<CustomType<T>>> { … }

@Override
public PCollection<KV<String, Set<CustomType<T>>>>
apply(PCollection<String> items) {

PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));

PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
partitionedItems.apply(
Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
new Merger()));

return combinedItems;
}
}



PCollection<String> input = ...
input.apply(new Processor<String>());

Dataflow获取每个DoFn的输出类型通过使用 TypeDescriptor getOutputTypeDescriptor 返回

因为你的 ParDoFnProcessor<T> 的内部类, 输出类型描述符就是 Set<CustomType<T>> ,即使它被实例化为新的 Processor<String> .

要获取类型信息,我们需要 ParDoFn静态地知道为 T 提供的类型.有两个步骤。

<强>1。创建 Processor 的匿名子类

PCollection<String> input = ...
input.apply(new Processor<String>() {});

这确保对于 Processor 的这个实例的所有内部类, 类型变量 T 静态绑定(bind)到类型 String .在这种情况下最好制作 Processor一个抽象类,因此消费者需要对其进行子类化。

2.覆盖 getOutputTypeDescriptorParDoFn针对外部类解析其类型 Processor .

class Processor<T extends Serializable> extends ... {
class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
@Override
protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
getOutputTypeDescriptor() {
return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
Processor.this.getClass()) {};
}
}

代码的完整工作版本如下。再次注意,当 Github Issue #57 时,这些都不是必需的。已解决。

class CustomType<T extends Serializable> { ... }

abstract class Processor<T extends Serializable>
extends PTransform<PCollection<String>,
PCollection<KV<String, Set<CustomType<T>>>>> {

class ParDoFn extends DoFn<String, KV<String, Set<CustomType<T>>>> {
...

@Override
protected TypeDescriptor<KV<String, Set<CustomType<T>>>>
getOutputTypeDescriptor() {
return new TypeDescriptor<KV<String, Set<CustomType<T>>>>(
Processor.this.getClass()) {};
}
}

class Merger extends BinaryCombineFn<Set<CustomType<T>>> { ... }

@Override
public PCollection<KV<String, Set<CustomType<T>>>> apply(PCollection<String> items) {

PCollection<KV<String, Set<CustomType<T>>>> partitionedItems =
items.apply(ParDo.of(new ParDoFn()));

PCollection<KV<String, Set<CustomType<T>>>> combinedItems =
partitionedItems.apply(
Combine.<String, Set<CustomType<T>>, Set<CustomType<T>>>perKey(
new Merger()));

return combinedItems;
}
}

PCollection<String> input = …;
input.apply(new Processor<String>() {});

这不是唯一的解决方案——您还可以覆盖 Processor.getDefaultOutputCoder 或显式调用 setCoder 在中间partitionedItems collection——但它似乎是最普遍的用途。

关于java - 在 Dataflow Generic 中进行转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32591914/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com