gpt4 book ai didi

apache-kafka - 在 Kafka Streams 的聚合器中访问 TimeWindow 属性

转载 作者:行者123 更新时间:2023-12-04 09:40:48 26 4
gpt4 key购买 nike

我想使用 Kafka-Streams 在时间窗口内流式传输主题的最新记录,并且我想将输出记录的时间戳设置为等于记录注册时间窗口的结束。

我的问题是我无法在聚合器内部访问窗口属性。

这是我现在拥有的代码:

    KS0
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(this.periodicity)).grace(Duration.ZERO)
)
.aggregate(
Constants::getInitialAssetTimeValue,
this::aggregator,
Materialized.<AssetKey, AssetTimeValue, WindowStore<Bytes, byte[]>>as(this.getStoreName()) /* state store name */
.withValueSerde(assetTimeValueSerde) /* serde for aggregate value */
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(this.toTopic);

我正在使用的聚合函数是这个:
private AssetTimeValue aggregator(AssetKey aggKey, AssetTimeValue newValue, AssetTimeValue aggValue){

// I want to do something like that, but this only works with windowed Keys to which I do
// not have access through the aggregator
// windowEndTime = aggKey.window().endTime().getEpochSecond();

return AssetTimeValue.newBuilder()
.setTimestamp(windowEndTime)
.setName(newValue.getName())
.setValue(newValue.getValue())
.build();
}

非常感谢您的帮助!

最佳答案

您只能通过处理器 API 操作时间戳。但是,您可以轻松地使用嵌入在 DSL 中的处理器 API。

对于您的情况,您可以插入 transform() toStream()之间和 to() .内Transformer您调用context.forward(key, value, To.all().withTimestamp(...))设置新的时间戳。此外,您将 return null最后(null 表示不发出任何记录,因为您已经为此目的使用了 context.forward)。

关于apache-kafka - 在 Kafka Streams 的聚合器中访问 TimeWindow 属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62343012/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com