gpt4 book ai didi

java - 如何在内存中的 Kafka Streams 状态存储上启用缓存

转载 作者:行者123 更新时间:2023-11-30 10:18:42 30 4
gpt4 key购买 nike

我想减少向下游发送的数据数量,并且由于我只关心给定键的最后一个值,所以我正在以这种方式从主题中读取数据:

KTable table = build.table("inputTopic", Materialized.as("myStore"));

为什么?因为在引擎盖下,数据正在被缓存,如所述here , 并仅在 commit.interval.mscache.max.bytes.buffering 启动时转发。

到目前为止一切顺利,但由于在这种情况下我根本没有利用 RocksDB,所以我想将其替换为内存存储的默认实现。我隐式启用缓存,以防万一。

Materialized.as(Stores.inMemoryKeyValueStore("myStore")).withCachingEnabled();

但它不起作用 - 数据未被缓存,每条记录都被发送到下游。

还有其他方法可以启用缓存吗?或者也许有更好的方法来实现我想要实现的目标?

最佳答案

看来我错了,内存状态存储缓存按预期工作。我将简要展示我是如何测试它的,也许有人会发现它有用。我制作了一个非常基本的 Kafka Streams 应用程序,它只读取抽象为 KTable 的主题。

public class Main {
public static void main(String[] args) {
StreamsBuilder builder = new StreamsBuilder();
Logger logger = LoggerFactory.getLogger(Main.class);

builder.table("inputTopic", Materialized.as(Stores.inMemoryKeyValueStore("myStore")).withCachingEnabled())
.toStream()
.foreach((k, v) -> logger.info("Result: {} - {}", k, v));

new KafkaStreams(builder.build(), getProperties()).start();
}

private static Properties getProperties() {
Properties properties = new Properties();
properties.put(APPLICATION_ID_CONFIG, "testApp");
properties.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(COMMIT_INTERVAL_MS_CONFIG, 10000);
properties.put(CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L);
properties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return properties;
}
}

然后我从 Kafka 运行控制台生产者:

/kafka-console-producer.sh --broker-list localhost:9092 --topic inputTopic --property "parse.key=true" --property "key.separator=:"

并发送了一些消息:a:a、a:b、a:c。只有它们的最后一条消息在应用程序中可见,因此缓存按预期工作。

2018-03-06 21:21:57 INFO Main:26 - Result: a - c

我还稍微更改了流以检查 aggregate 方法的缓存。

builder.stream("inputTopic")
.groupByKey()
.aggregate(() -> "", (k, v, a) -> a + v, Materialized.as(Stores.inMemoryKeyValueStore("aggregate")))
.toStream()
.foreach((k, v) -> logger.info("Result: {} - {}", k, v));

我使用相同的 key 快速连续地发送了几条消息,我只收到了一个结果,所以数据没有立即发送到下游 - 完全符合预期。

关于java - 如何在内存中的 Kafka Streams 状态存储上启用缓存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49100931/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com