gpt4 book ai didi

java - 如何在apache kafka主题中查询记录的最后一个值

转载 作者:行者123 更新时间:2023-12-02 09:43:05 25 4
gpt4 key购买 nike

我是 Kafka 新手,所以可能很简单。但从我现在面临的问题来看,我看不到任何解决方案。我有一个 Kafka 主题 metric_32,我想查找键 user_abc 的最新值。这在 Kafka 中是如何可能的。

我尝试使用KStream,但它会在新事件到达主题时订阅。但我想要的是查询已经出现的键的最后一个值。任何例子都会有帮助。

最佳答案

您可以使用状态存储(如果您使用的是 Kafka 流),然后向其中添加一个处理器,每当有新值推送到主题时,该处理器就会更新状态存储。

builder.addGlobalStore(storeBuilder, topic, Consumed.with(keySerde, valueSerde), return new Processor<K,V>() {
private KeyValueStore<K,V> store;

public void init(ProcessorContext context) {
store=(KeyValueStore<K,V>) context.getStateStore("statestorename");
}

public void process(K key, V value) {
store.put(key,value);
}

public void close() {}
});

然后你就可以使用

readOnlyStore=streams.store("statestorename", QueryableStoreTypes.keyValueStore());
readOnlyStore.get("key");

关于java - 如何在apache kafka主题中查询记录的最后一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56900416/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com