gpt4 book ai didi

apache-kafka - 弗林克 : Window does not process data at end of stream

转载 作者:行者123 更新时间:2023-12-04 04:14:07 25 4
gpt4 key购买 nike

我有一个带有 flink kafka 消费者的流(kafka msgs 正在流式传输到一个主题上),我注意到一个我正在寻找解决的有趣行为。

当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。

示例流程:

    env.addSource(kafkaConsumer)
.flatMap(new TokenMapper())
.keyBy("word")
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.reduce(new CountTokens())
.flatMap(new ConvertToString())
.addSink(producer);

我正在使用 FlinkKafkaConsumer010,并将 env TimeCharacteristic 设置为 EventTime。和 consumer.assignTimestampsAndWatermarks(new PeriodicWatermarks())
private static class PeriodicWatermarks implements   AssignerWithPeriodicWatermarks<String>{

private long currentMaxTimestamp;
private final long maxOutOfOrderness;

public PeriodicWatermarksAuto(long maxOutOfOrderness){
this.maxOutOfOrderness = maxOutOfOrderness;
}

@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}

@Override
public long extractTimestamp(String t, long l) {
// this should be the event timestamp
currentMaxTimestamp = l;
logger.info("TIMESTAMP: " + l);
return l;
}
}

如果我的窗口是 10 秒,而我的数据流仅包含 8 秒的数据(然后停止流传输一段时间),则 flatMap->sink 不会处理,直到新的后续数据流入。

示例数据流处理问题:(每个 x 是每秒一条数据
)
      xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
^(not processed) (until I get here)^

类似地,例如,如果我有 35 秒的流数据(我的窗口也是 10 秒),则只有 3 个窗口的数据触发,其余 5 秒的数据永远不会处理。
     ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
(processed) ^(not processed) (until I get here)^

最后,如果我的窗口是 10 秒并且我只有 5 秒的流数据,那么 flatmap->sink 永远不会发生。

我的问题是, 如果一段时间后我们没有看到数据,有没有办法触发窗口数据进行处理?

如果我的数据正在实时流式传输,我可以看到有大量的无数据,并且不希望最后一个窗口(假设只有 5 秒的数据)不得不等待一些不确定的时间,直到新数据到来在,我想要窗口时间过去后最后一个窗口的结果。

仔细想想,这似乎是由于使用了 EventTime 而不是 ProcessingTime,或者,我的水印没有为最后一个窗口实际触发而正确生成......不确定两者是否都有?我认为这对任何人来说都是一个问题,如果您的流结束最后一位不会触发。我会说我可能会发送一个流结束消息,但是如果由于源中断上游而导致流结束,这无济于事。

编辑:所以我改为处理时间,它确实正确处理了最后一个窗口中的数据,所以我猜事件时间毕竟是罪魁祸首,我认为自定义触发器或适当的窗口水印可能是答案......

谢谢您的帮助!

最佳答案

我会把这个留给后代,因为这个问题和我想的一样,与水印有关。时间戳器和制水器(来自assignTimestampsAndWatermarks)调用“getCurrentWatermark()”,并且由于我将基于传入实体的水印设置为固定数字(它们的时间戳 - 最大偏移量),它在看到新实体之前不会更新。

我的解决方案是某种计时器,如果在可配置的时间内没有看到数据,则最终将水印推进到下一个窗口。我将无法处理非常潜在的数据,但我认为这不会成为问题。这是 EventTime 处理的预期行为。

关于apache-kafka - 弗林克 : Window does not process data at end of stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46432368/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com