gpt4 book ai didi

apache-kafka - Kafka Streams K-Table 大小监控

转载 作者:行者123 更新时间:2023-12-04 10:18:35 25 4
gpt4 key购买 nike

我有一个流拓扑,它从一个主题中消费并运行一个聚合并构建一个 KTable,它被物化到 RocksDB 中。

我有另一个应用程序,它每天消耗来自同一主题的所有事件,并为满足某些特定条件(即不再需要它们)的事件发送墓碑消息。
聚合处理这个并从状态存储中删除,但我正在监视状态存储的大小或更改日志主题 - 任何真正告诉我 ktable 大小的东西。

我已经公开了 JMX 指标,但似乎没有任何东西可以满足我的需求。我可以看到“放入”到rocksDB 的总数,但看不到键的总数。
我的应用程序是 Spring 启动,我想通过普罗米修斯公开指标。

有没有人解决过这个问题或任何有帮助的想法?

最佳答案

您可以通过使用此 KeyValueStore#approximateNumEntries() 访问 KTable 的底层状态存储来获取每个分区中的近似计数。然后将此计数导出到普罗米修斯(每个分区有一个计数)。

要访问底层状态存储,您可以使用低级处理器 API 来访问 KeyValueStore通过每个 StreamTask 中的每个 ProcessorContext(对应一个分区)。只需添加一个 KStream#transformValues()到您的拓扑:

kStream
...
.transformValues(ExtractCountTransformer::new, "your_ktable_name")
...

在 ExtractCountTransformer 中,将计数提取到普罗米修斯:
@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {

private KeyValueStore<String, String> yourKTableKvStore;
private ProcessorContext context;

@Override
public void init(ProcessorContext context) {
this.context = context;
yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
}

@Override
public String transform(String readOnlyKey, String value) {
//extract count to prometheus
log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
yourKTableKvStore.approximateNumEntries();
return value;
}

@Override
public void close() {

}
}

关于apache-kafka - Kafka Streams K-Table 大小监控,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60966107/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com