gpt4 book ai didi

java - Apache Flink 将结果减少为多个值而不是一个

转载 作者:行者123 更新时间:2023-11-29 08:25:28 27 4
gpt4 key购买 nike

我正尝试在 WindowedStream 上实现 reduce,如下所示:

                .keyBy(t -> t.key)
.timeWindow(Time.of(15, MINUTES), Time.of(1, MINUTES))
.reduce(new ReduceFunction<TwitterSentiments>() {
@Override
public TwitterSentiments reduce(TwitterSentiments t2, TwitterSentiments t1) throws Exception {
t2.positive += t1.positive;
t2.neutral += t1.neutral;
t2.negative += t1.negative;

return t2;
}
});

我遇到的问题是,当我调用 stream.print() 时,我得到了很多值(看起来像每个 TwitterSentiments 对象一个,而不是一个聚合对象。

我也试过像这样使用 AggregationFunction,但有同样的问题:

                .aggregate(new AggregateFunction<TwitterSentiments, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>>() {
@Override
public Tuple3<Long, Long, Long> createAccumulator() {
return new Tuple3<Long, Long, Long>(0L,0L,0L);
}

@Override
public Tuple3<Long, Long, Long> add(TwitterSentiments ts, Tuple3<Long, Long, Long> accumulator) {
return new Tuple3<Long, Long, Long>(
accumulator.f0 + ts.positive.longValue(),
accumulator.f1 + ts.neutral.longValue(),
accumulator.f2 + ts.negative.longValue()
);
}

@Override
public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator) {
return accumulator;
}

@Override
public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> accumulator1, Tuple3<Long, Long, Long> accumulator2) {
return new Tuple3<Long, Long, Long>(
accumulator1.f0 + accumulator2.f0,
accumulator1.f1 + accumulator2.f1,
accumulator1.f2 + accumulator2.f1);
}
});

为什么stream.print()在这些聚合后仍然会输出很多记录?

最佳答案

如果您不需要每个键的结果,您可以使用 timeWindowAll 来生成单个结果。但是,timeWindowAll 不会并行运行。如果您想以更具可扩展性的方式计算结果,您可以这样做:

    .keyBy(t -> t.key)
.timeWindow(<time specification>)
.reduce(<reduce function>)
.timeWindowAll(<same time specification>)
.reduce(<same reduce function>)

您可能希望 Flink 的运行时足够智能,可以为您执行这种并行预聚合(假设您使用的是 ReduceFunction 或 AggregateFunction),但事实并非如此。

关于java - Apache Flink 将结果减少为多个值而不是一个,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53624098/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com