gpt4 book ai didi

java - Cloud Dataflow 中与序列化 Coder 相关的 RuntimeException

转载 作者:行者123 更新时间:2023-11-29 03:01:15 25 4
gpt4 key购买 nike

我在运行我的数据流管道时遇到了一个奇怪的问题。我已经编写了自己的 Coder,但将其替换为 AvroCoder、SerializableCoder 和其他示例会产生相同的问题。

这是我尝试在流模式下使用数据流服务启动管道后遇到的异常:

Exception in thread "main" java.lang.RuntimeException: Unable to deserialize Coder: ModelCoder. Check that a suitable constructor is defined.  See Coder for details.
at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:113)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureCoderSerializable(DirectPipelineRunner.java:901)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensurePCollectionEncodable(DirectPipelineRunner.java:861)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.setPCollectionValuesWithMetadata(DirectPipelineRunner.java:789)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.setPCollection(DirectPipelineRunner.java:776)
at com.google.cloud.dataflow.sdk.io.TextIO.evaluateReadHelper(TextIO.java:786)
at com.google.cloud.dataflow.sdk.io.TextIO.access$000(TextIO.java:118)
at com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound$1.evaluate(TextIO.java:327)
at com.google.cloud.dataflow.sdk.io.TextIO$Read$Bound$1.evaluate(TextIO.java:323)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:706)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
at io.momentum.demo.models.pipeline.PlatformPipeline.main(PlatformPipeline.java:96)
Caused by: java.lang.IllegalStateException: Sub-class com.google.cloud.dataflow.sdk.util.CoderUtils$Jackson2Module$Resolver MUST implement `typeFromId(DatabindContext,String)
at com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase.typeFromId(TypeIdResolverBase.java:77)
at com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._findDeserializer(TypeDeserializerBase.java:156)
at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:106)
at com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:91)
at com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:142)
at com.fasterxml.jackson.databind.deser.impl.TypeWrappedDeserializer.deserialize(TypeWrappedDeserializer.java:42)
at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3760)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2042)
at com.fasterxml.jackson.databind.ObjectMapper.treeToValue(ObjectMapper.java:2529)
at com.google.cloud.dataflow.sdk.util.Serializer.deserialize(Serializer.java:98)
at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:110)
... 18 more

我的实现 Coder 只是包装了 AvroCoder 并 Hook 到我们自己的一些代码中:

public final class ModelCoder<M extends AppModel> extends AtomicCoder<M> {
public static <T extends AppModel> ModelCoder<T> of(Class<T> clazz) {
return new ModelCoder<>(clazz);
}

@JsonCreator
@SuppressWarnings("unchecked")
public static ModelCoder<?> of(@JsonProperty("kind") String classType) throws ClassNotFoundException {
Class<?> clazz = Class.forName(classType);
return of((Class<? extends AppModel>) clazz);
}

private String kind;

public ModelCoder(Class<M> type) {
this.kind = type.getSimpleName();
}

@Override
public void encode(M value, OutputStream outStream, Context context) throws IOException, CoderException {
CoderInternals.encode(value, outStream, context, new TypeReference<TypedSerializedModel<M>>() { });
}

@Override
public M decode(InputStream inStream, Context context) throws IOException, CoderException {
return CoderInternals.decode(inStream, context, new TypeReference<TypedSerializedModel<M>>() { });
}

@Override
public CloudObject asCloudObject() {
CloudObject co = super.asCloudObject();
co.set("kind", kind);
return co;
}
}

编码器在调用 encode(..)decode(..) 一个 AppModel 时按预期工作,但这无论如何都会发生异常。

最佳答案

您需要一个带有@JsonCreator 标记的静态方法,以便该服务可以在工作人员上实例化您的编码器。你也不应该覆盖 asCloudObject();这决定了您的 Coder 将如何序列化并发送给工作人员,您的代码将只发送一个序列化的 AvroCoder。

例如,查看 NullableCoder.java ( https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ) 中的一个编码器示例,它包装了另一个编码器。

关于java - Cloud Dataflow 中与序列化 Coder 相关的 RuntimeException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34780459/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com