gpt4 book ai didi

apache-flink - Flink 窗口 : aggregate and output to sink

转载 作者:行者123 更新时间:2023-12-02 00:30:58 26 4
gpt4 key购买 nike

我们有一个数据流,其中每个元素都是这种类型:

id: String
type: Type
amount: Integer

我们希望聚合此流并每周输出一次 amount 的总和。

当前解决方案:

一个示例 flink 管道如下所示:

stream.keyBy(type)
.window(TumblingProcessingTimeWindows.of(Time.days(7)))
.reduce(sumAmount())
.addSink(someOutput())

输入

| id | type | amount |
| 1 | CAT | 10 |
| 2 | DOG | 20 |
| 3 | CAT | 5 |
| 4 | DOG | 15 |
| 5 | DOG | 50 |

如果窗口在记录 34 之间结束,我们的输出将是:

| TYPE | sumAmount |
| CAT | 15 | (id 1 and id 3 added together)
| DOG | 20 | (only id 2 as been 'summed')

Id 45 仍然在 flink 管道中,将在下周输出。

因此下周我们的总产量将是:

| TYPE | sumAmount |
| CAT | 15 | (of last week)
| DOG | 20 | (of last week)
| DOG | 65 | (id 4 and id 5 added together)

新要求:

我们现在还想知道每条记录是在哪一周处理的。换句话说,我们的新输出应该是:

| TYPE | sumAmount | weekNumber |
| CAT | 15 | 1 |
| DOG | 20 | 1 |
| DOG | 65 | 2 |

但我们还想要这样的额外输出:

| id | weekNumber |
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
| 4 | 2 |
| 5 | 2 |

如何处理?

flink有什么办法可以实现吗?我想我们会有一个聚合函数,它可以对金额求和,但也可以输出每条记录以及当前周数,但我在文档中找不到执行此操作的方法。

(注意:我们每周处理大约 1 亿条记录,因此理想情况下我们只想在一周内将聚合保持在 flink 的状态,而不是所有单独的记录)

编辑:

我选择了下面 Anton 描述的解决方案:

DataStream<Element> elements = 
stream.keyBy(type)
.process(myKeyedProcessFunction());

elements.addSink(outputElements());
elements.getSideOutput(outputTag)
.addSink(outputAggregates())

KeyedProcessFunction 看起来像这样:

class MyKeyedProcessFunction extends KeyedProcessFunction<Type, Element, Element>
private ValueState<ZonedDateTime> state;
private ValueState<Integer> sum;

public void processElement(Element e, Context c, Collector<Element> out) {
if (state.value() == null) {
state.update(ZonedDateTime.now());
sum.update(0);
c.timerService().registerProcessingTimeTimer(nowPlus7Days);
}
element.addAggregationId(state.value());
sum.update(sum.value() + element.getAmount());
}

public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) {
state.update(null);
c.output(outputTag, sum.value());
}
}

最佳答案

reduce 方法有一个变体,它将 ProcessWindowFunction 作为第二个参数。你会像这样使用它:

stream.keyBy(type)
.window(TumblingProcessingTimeWindows.of(Time.days(7)))
.reduce(sumAmount(), new WrapWithWeek())
.addSink(someOutput())

private static class WrapWithWeek
extends ProcessWindowFunction<Event, Tuple3<Type, Long, Long>, Type, TimeWindow> {

public void process(Type key,
Context context,
Iterable<Event> reducedEvents,
Collector<Tuple3<Type, Long, Long>> out) {
Long sum = reducedEvents.iterator().next();
out.collect(new Tuple3<Type, Long, Long>(key, context.window.getStart(), sum));
}
}

通常情况下,ProcessWindowFunction 会传递一个 Iterable,其中包含窗口收集的所有事件,但如果您使用 reduce 或聚合函数来预聚合窗口结果,则只有该单个值会传递到 Iterable。这方面的文档是 here但文档中的示例目前有一个小错误,我已在此处的示例中修复了该错误。

但鉴于对第二个输出的新要求,我建议您放弃使用 Windows 执行此操作的想法,而是使用键控 ProcessFunction .您将需要两个每个键的 ValueState:一个按周计数,另一个用于存储总和。您需要一个每周触发一次的计时器:当它触发时,它应该发出类型、总和和周数,然后递增周数。同时,流程元素方法将简单地输出每个传入事件的 ID 以及周计数器的值。

关于apache-flink - Flink 窗口 : aggregate and output to sink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51990582/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com