作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 BroadcastState 设置一个项目,但由于某种原因,当我尝试运行它时收到此错误:
org.apache.flink.streaming.runtime.tasks.StreamTaskException:无法序列化运算符对象类org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator。
我不知道为什么它会抛出它。传入和输出的对象(SampleInput 和 Token)是非常简单的 avro 生成的 pojo,具有两个或三个字段,我尝试将 BroadcastProcessFunction 的方法留空,以删除我可以设置的任何内容,使其无法序列化,但是仍然收到错误。这是代码的相关部分:
//Sideoutput that error strings will be written to
OutputTag<String> sideOutputTag = new OutputTag<String>("side-output") {};
//<Setup for broadcast state>
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.cleanupFullSnapshot()
.build();
final MapStateDescriptor<String, Token> ruleStateDescriptor = new MapStateDescriptor<>(
"oathTokens",
BasicTypeInfo.STRING_TYPE_INFO,
AvroTypeInfo.of(new TypeHint<Token>() {}));
ruleStateDescriptor.enableTimeToLive(ttlConfig);
DataStream<Token> tokenObjectStream = tokenSourceStream.process(new JsonToTokenProcessFunction(sideOutputTag))
.startNewChain()
.uid("tokenObjectStream")
.name("tokenObjectStream");
BroadcastStream<Token> ruleBroadcastStream = tokenObjectStream.broadcast(ruleStateDescriptor);
//</Config for broadcast state>
//<Main Data Input Stream>
DataStream<SampleInput> jsonToSampleInput = kafkaStream.process(new JsonToPojoProcessFunction(sideOutputTag))
.startNewChain()
.uid("sampleInputStream")
.name("sampleInputStream");
BroadcastConnectedStream<SampleInput, Token> broadcastConnectedStream = jsonToSampleInput.connect(ruleBroadcastStream);
DataStream<SampleInput> matchedBroadcastStream = broadcastConnectedStream.process(new BroadcastProcessFunction<SampleInput, Token, SampleInput>() {
@Override
public void processElement(SampleInput sampleInput, ReadOnlyContext readOnlyContext, Collector<SampleInput> collector) throws Exception {
}
@Override
public void processBroadcastElement(Token token, Context context, Collector<SampleInput> collector) throws Exception {
}
});
任何帮助将不胜感激。我确信我只是忽略了一些事情。谢谢!
最佳答案
事实证明 ttlConfig 对象是不可序列化的。删除它解决了该问题。
关于java - Flink奇怪的 "Cannot Serialize operator object class ...CoBroadcastWithNonKeyedOperator"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55127868/
我正在尝试使用 BroadcastState 设置一个项目,但由于某种原因,当我尝试运行它时收到此错误: org.apache.flink.streaming.runtime.tasks.Stream
我是一名优秀的程序员,十分优秀!