gpt4 book ai didi

java - kafka流跳跃窗口聚合导致时间戳零时出现多个窗口

转载 作者:行者123 更新时间:2023-12-02 08:51:37 27 4
gpt4 key购买 nike

Kafka Streams DSL 窗口聚合导致多个窗口。

@StreamListener("input")
public void process(KStream<String, Data> DataKStream) {

JsonSerde<DataAggregator> DataJsonSerde =
new JsonSerde<>(DataAggregator.class);

DataKStream
.groupByKey()
.windowedBy(TimeWindows.of(60000).advanceBy(30000))
.aggregate(
DataAggregator::new,
(key, Data, aggregator) -> aggregator.add(Data),
Materialized.with(Serdes.String(), DataJsonSerde)
);
}

DataAggregator.java

public class DataAggregator {

private List<String> dataList = new ArrayList<>();

public DataAggregator add(Data data) {
dataList.add(data.getId());
System.out.println(dataList);
return this;
}

public List<String> getDataList() {
return dataList;
}
}

我根据键对输入数据进行分组,然后执行 1 分钟窗口和 30 秒跳跃,在聚合器中我只是收集数据并显示。

一开始我预计会出现一个窗口,30 秒后会出现另一个窗口。但实际输出有所不同,因为一开始就创建了 2 个窗口。

预期:

[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6] // till 30 seconds only one window
[6] // new window after 30 seconds
[1, 2, 3, 4, 5, 6, 7]
[6, 7]
[1, 2, 3, 4, 5, 6, 7, 8]
[6, 7, 8]

实际输出:

[1]
[1]
[1, 2]
[1, 2]
[1, 2, 3]
[1, 2, 3]
[1, 2, 3, 4]
[1, 2, 3, 4]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6] // duplicate window even before 30 seconds
[6] // new window after 30 seconds and 1 window from earlier will be dropped
[1, 2, 3, 4, 5, 6, 7]
[6, 7]

因为我在 1 分钟的窗口中创建了 30 秒的希望窗口。我相信,最初应该只有一个窗口,30 秒后应该创建另一个窗口。

有人可以告诉我,实际输出是预期的行为还是我遗漏了什么?

注意:我每 4 秒获取一次输入数据,预期/实际输出仅用于表示。

来自 Kafka 文档:

Hopping time windows are aligned to the epoch, with the lower interval bound being inclusive and the upper bound being exclusive. “Aligned to the epoch” means that the first window starts at timestamp zero. For example, hopping windows with a size of 5000ms and an advance interval (“hop”) of 3000ms have predictable window boundaries [0;5000),[3000;8000),... — and not [1000;6000),[4000;9000),... or even something “random” like [1452;6452),[4452;9452),....

最佳答案

由于您的窗口重叠,因此每个时间戳会获得多个窗口。对于您的特定窗口配置,您始终会获得 2 个窗口(以毫秒为单位):

[0,60000)   [60000,12000) [12000,18000) ...
[30000,90000) [90000,15000) ...

您无法更改此行为,但是,您可以对结果应用 filter() (即 aggregate(...).filter(...) 删除您不感兴趣的窗口。

此外,默认情况下,Kafka Streams 使用记录事件时间。有一个 WallclockTimestampExtractor 但只有在显式设置时才会使用它。比照。 https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-timestamp-extractor

关于java - kafka流跳跃窗口聚合导致时间戳零时出现多个窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58947340/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com