gpt4 book ai didi

apache-kafka-streams - KStreamWindowAggregate 似乎共享流时间导致窗口过期

转载 作者:行者123 更新时间:2023-12-04 15:43:45 25 4
gpt4 key购买 nike

由于窗口过期而丢弃的消息,即使对于该特定键不应关闭窗口

我想对从单个分区主题消耗的消息进行分组,并根据事件时间将这些消息窗口化 30 秒。为了避免立即处理,我调用了抑制方法并使用了 .grace 方法。一旦窗口关闭(30 秒后 + 0 宽限期),我希望将最终结果添加到主题中。我从该主题消费的消息有两个不同的键:300483976 和 300485339。我消费的消息将 eventtime 增加了 10 秒。我读到流时间只会根据增加事件时间的新消息增加。这也是我的经历。但是,我看到的问题如下:

我消耗了 key 300483976 的前 10 条消息。基于方法“KStreamWindowAggregate.process”,我注意到 internalProcessorContext.streamTime() 每次都增加,基于最新消耗的消息。处理完 10 条消息后,最终的 eventtime 现在是 starttime + 300 秒。在那之后, key 300485339 的消息被消耗。所有,但最新的消息被标记为过期并被丢弃,并带有消息“跳过过期窗口的记录。”。似乎 internalProcessorContext.streamTime() 仍然记得第一次运行的最新值,因此丢弃了键为 300485339 的消息。

stream
.groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(30))
.grace(Duration.ofMillis(0))) // override the default of 24 hours
.aggregate(Data::new, transform(), materialize())
.filter((key, value) -> {
log.info("agg {} {}", key, value.toString());
return true;
})
.suppress(
Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream();

我希望当消息按键(300483976 和 300485339)分组时,流时间不会“共享”。我希望 key 300483976 和 key 300485339 会有单独的窗口。知道出了什么问题吗?我正在使用 kafka-streams 2.1.0 和一个时间戳提取器,它从消息中的一个字段中获取事件时间。

更新

我做了一些额外的测试并改编了一个不使用聚合的例子,但确实显示了与流时间相同的问题:
    @Test
public void shouldSupportFinalResultsForTimeWindows() {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
.windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
valueCounts
.toStream()
.map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
.to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
final Topology topology = builder.build();
System.out.println(topology.describe());
final ConsumerRecordFactory<String, String> recordFactory =
new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
// note this last records sets the streamtime to 7L causing the next messages to be discarded
driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
}
}

在上面的示例中,第二条消息将流时间设置为 7L,即使消息具有不同的键,也会关闭创建的 0 到 2 窗口。这也会导致下几条消息被丢弃,即使 key 是 k1。因此,从这个示例中可以清楚地看出,没有考虑 key 。如果这实际上是它的设计方式,我想知道这个场景是什么。特别是当我认为一个主题具有不同分区的消息并且一个分区可能具有与其他分区的流时间(源自事件时间)完全不同的消息时,这是很常见的。希望你能对此有所了解??

最佳答案

观察到的行为是设计使然。显然,流时间在所有消息中被跟踪(它不是子流时间)。

您看到的“问题”是,您的输入数据是乱序的(只是输入 key 和 ts):

(k1, 1), (k1, 2), (k1, 3), (k2, 1), (k2, 2), (k3, 3)

时间不是单调递增的,即key为 k2的记录键为 k1 的记录乱序.因为您将宽限期设置为零,所以您告诉 Kafka Streams 不允许无序数据(或实际上只有窗口中的一些无序数据)。因此,对于具有交错键但单调递增时间戳的有序数据流,结果只会如您所料):
(k1, 1), (k2, 1), (k1, 2), (k2, 2), (k1, 3), (k3, 3)

如果您有乱序数据,您应该相应地设置高宽限期(零仅适用于有序数据流)。

关于apache-kafka-streams - KStreamWindowAggregate 似乎共享流时间导致窗口过期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56773310/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com