gpt4 book ai didi

java - 删除 Kafka StateStore 中的记录不起作用(.delete(key) 上抛出 NullPointerException)

转载 作者:行者123 更新时间:2023-12-02 01:00:50 26 4
gpt4 key购买 nike

我的代码中有一个具体化的内存状态存储。我有另一个单独的流,应该根据某些条件查找和删除记录。

我需要允许我的流访问和删除先前构建的状态存储中的记录。我下面有以下代码

@bean
public StreamBuilder myStreamCodeBean(StreamBuilder streamBuilder) {
//create store supplier
KeyValueBytesStoreSupplier myStoreSupplier = Stores.inMemoryKeyValueStore("MyStateStore");

//materialize statstore and enable caching
Materialized materializedStore = Materialized.<String, MyObject>as(myStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(myObjectSerde)
.withCachingEnabled();

//other code here that creates KTable, and another stream to consume records into ktable from another topic
//........

//another stream that consumes another topic and deletes records in store with some logic
streamsBuilder
.stream("someTopicName", someConsumerObject)
.filter((key, value) -> {
KeyValueStore<Bytes, byte[]> kvStore = myStoreSupplier.get();
kvStore.delete(key); //StateStore never "open" and this throws nullpointerexception (even tho key is NOT null)
return true;
}
.to("some topic name here", producerObject);
return streamBuilder;
}

抛出的错误非常普遍。错误是 Kafka 流未运行。

进行一些调试后,我发现在执行删除时我的状态存储未“打开”。

我在这里做错了什么?我可以使用 ReadOnlyKeyValueStore 读取记录,但我需要删除,所以我无法使用它。

感谢任何帮助。

最佳答案

状态存储必须通过处理器的上下文访问,而不是使用供应商对象。

创建存储区后,您需要确保您尝试访问该存储区的处理器可以访问该存储区。

<小时/>

如果您的商店是本地商店,那么您需要指定哪些处理器将访问该商店。

如果您的商店是全局商店,则拓扑中的所有处理器都可以访问它。

<小时/>

您正在使用streamsBuilder.stream()创建一个流,并且至少从您发布的代码中,您似乎没有授予您的处理器访问国营商店。

  1. 确保您已调用 addStateStore()StreamsBuilder
  2. 要获取处理器中的状态存储,我们需要使用context.getStateStore(storeName)。您可以引用以下example

  3. (我不认为我们可以访问 filter() 中的状态存储,因为它是无状态操作)。所以,你可以使用ProcessorTransformer并传入状态存储名称(在您的情况下为 MyStateStore)。

关于java - 删除 Kafka StateStore 中的记录不起作用(.delete(key) 上抛出 NullPointerException),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60675088/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com