gpt4 book ai didi

apache-flink - Flink Process Function 没有将数据返回到 Sideoutputstream

转载 作者:行者123 更新时间:2023-12-04 09:51:51 25 4
gpt4 key购买 nike

我正在尝试使用一组规则验证 JSONObject 如果 json 与一组规则匹配,它将返回匹配的规则,而 JSONObject 如果不是,它将返回一个 JSONObject 到 Sideoutput 所有这些都在 ProcessFuntion 中处理,我得到了主要输出但无法捕获侧面输出

SideOutput Stream 定义如下

public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
"unmatched-side-output") {};

ProcessFunction 定义如下
public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
public void processElement(Tuple2<String, org.json.JSONObject> value,
ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {

if(this.value.matches((value.f1))) {
out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
}else {
ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
}
}
}

我正在打印主数据流输出,如下所示
    DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
@Override
public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
return new Tuple2<>(value, input);
}
}).process(new RuleFilter()).print("MatchedJSON=>");

matchedJSON .print("matchedJSON=>");

我正在打印 Sideoutput 如下
DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
@Override
public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
return value.f1;
}
})).getSideOutput(unMatchedJSONSideOutput );

unmatchedJSON.print("unmatchedJSON=>");

主流正在打印输出,但侧输出未打印无效 json 请帮助解决问题

最佳答案

问题在这里:

DataStream<org.json.JSONObject> unmatchedJSON =
((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
.getSideOutput(unMatchedJSONSideOutput);

您应该调用 getSideOutput直接上 matchedJSON ,而不是应用 MapFunction 的结果给它。只有 ProcessFunction可以有一个侧面输出,它需要直接来自 ProcessFunction .您通过从映射中转换输出流来欺骗编译器接受这一点,但运行时对此无法做任何有意义的事情。

关于apache-flink - Flink Process Function 没有将数据返回到 Sideoutputstream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62005413/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com