gpt4 book ai didi

java - 弗林克 : Is there another approach to compute average and a state variable instead of using RichAggregateFunction?

转载 作者:行者123 更新时间:2023-11-30 05:45:26 25 4
gpt4 key购买 nike

我不确定必须使用哪个流 Flink 转换来计算某个流的平均值并在 5 秒的窗口内更新状态(假设它是我的状态的整数数组)。如果我使用 RichFlatMapFunction,我可以计算平均值并更新我的数组状态。但是,我必须打电话

streamSource
.keyBy(0)
.flatMap(new MyRichFlatMapFunction())
.print()

我不能把它写在窗口上。如果我使用

streamSource
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new MyAggregateFunction())
.print()

我无法通过 ValueState 保留数组状态。

我试图使用RichAggregateFunction,但我遇到了与该线程相同的问题。 Flink error on using RichAggregateFunction是否有另一种方法来计算平均值并跟踪 Flink 中的另一个状态?

在 Flink 中我该如何解决这个问题?这是我正在尝试执行的方法,但实际上不起作用> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L70

streamStations.filter(new SensorFilter("COUNT_TR"))
.map(new TrainStationMapper())
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(5)));
// THIS AGGREGATE DOES NOT WORK
// .aggregate(new AverageRichAggregator())
// .print();

public static class AverageRichAggregator extends
RichAggregateFunction<Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double>, Tuple3<Double, Long, Integer>, Tuple2<String, Double>> {

private static final long serialVersionUID = -40874489412082797L;
private String functionName;
private ValueState<CountMinSketch> countMinSketchState;

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<CountMinSketch> descriptor = new ValueStateDescriptor<>("countMinSketchState",
CountMinSketch.class);
this.countMinSketchState = getRuntimeContext().getState(descriptor);
}

@Override
public Tuple3<Double, Long, Integer> createAccumulator() {
this.countMinSketchState.clear();
return new Tuple3<>(0.0, 0L, 0);
}

@Override
public Tuple3<Double, Long, Integer> add(
Tuple3<Integer, Tuple5<Integer, String, Integer, String, Integer>, Double> value,
Tuple3<Double, Long, Integer> accumulator) {
try {
if (value.f1.f1.equals("COUNT_PE")) {
// int count = (int) Math.round(value.f2);
// countMinSketch.updateSketchAsync("COUNT_PE");
} else if (value.f1.f1.equals("COUNT_TI")) {
// int count = (int) Math.round(value.f2);
// countMinSketch.updateSketchAsync("COUNT_TI");
} else if (value.f1.f1.equals("COUNT_TR")) {
// int count = (int) Math.round(value.f2);
// countMinSketch.updateSketchAsync("COUNT_TR");
}
CountMinSketch currentCountMinSketchState = this.countMinSketchState.value();
currentCountMinSketchState.updateSketchAsync(value.f1.f1);
this.countMinSketchState.update(currentCountMinSketchState);

} catch (IOException e) {
e.printStackTrace();
}

return new Tuple3<>(accumulator.f0 + value.f2, accumulator.f1 + 1L, value.f1.f4);
}

@Override
public Tuple2<String, Double> getResult(Tuple3<Double, Long, Integer> accumulator) {
String label = "";
int frequency = 0;
try {
if (functionName.equals("COUNT_PE")) {
label = "PEOPLE average on train station";
// frequency = countMinSketch.getFrequencyFromSketch("COUNT_PE");

} else if (functionName.equals("COUNT_TI")) {
label = "TICKETS average on train station";
// frequency = countMinSketch.getFrequencyFromSketch("COUNT_TI");

} else if (functionName.equals("COUNT_TR")) {
label = "TRAIN average on train station";
// frequency = countMinSketch.getFrequencyFromSketch("COUNT_TR");
}
frequency = this.countMinSketchState.value().getFrequencyFromSketch(functionName);

} catch (IOException e) {
e.printStackTrace();
}

return new Tuple2<>(label + "[" + accumulator.f2 + "] reads[" + frequency + "]",
((double) accumulator.f0) / accumulator.f1);
}

@Override
public Tuple3<Double, Long, Integer> merge(Tuple3<Double, Long, Integer> a, Tuple3<Double, Long, Integer> b) {
return new Tuple3<>(a.f0 + b.f0, a.f1 + b.f1, a.f2);
}
}

错误:

Exception in thread "main" java.lang.UnsupportedOperationException: This aggregation function cannot be a RichFunction.
at org.apache.flink.streaming.api.datastream.WindowedStream.aggregate(WindowedStream.java:692)
at org.sense.flink.examples.stream.MultiSensorMultiStationsReadingMqtt2.<init>(MultiSensorMultiStationsReadingMqtt2.java:71)
at org.sense.flink.App.main(App.java:141)

谢谢

最佳答案

不允许聚合器保留任意状态,以防聚合器可能与合并窗口一起使用 - 因为 Flink 不知道如何合并您的临时状态。

但是您可以将 AggregateFunction 与 ProcessWindowFunction 组合起来,如下所示:

input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction());

ProcessWindowFunction 的 process 方法将传递一个仅包含预聚合结果的迭代器,以及 Context that provides access to both global and per-window state 。希望这能以简单的方式提供您所需的内容。但是,如果您需要使用每个到达的记录更新自己的状态,那么您需要扩展聚合器管理的类型以适应这一点。

以下是如何使用全局状态的粗略概述:

private static class MyWindowFunction extends ProcessWindowFunction<IN, OUT, KEY, TimeWindow> {
private final static ValueStateDescriptor<Long> myGlobalState =
new ValueStateDescriptor<>("stuff", LongSerializer.INSTANCE);

@Override
public void process(KEY key, Context context, Iterable<IN> values, Collector<OUT> out) {
ValueState<Long> goodStuff = context.globalState().getState(myGlobalState);
}
}

关于java - 弗林克 : Is there another approach to compute average and a state variable instead of using RichAggregateFunction?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54923538/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com