gpt4 book ai didi

apache-kafka-streams - 为什么我的 Kafka Transformer 的 StateStore 无法访问?

转载 作者:行者123 更新时间:2023-12-02 06:49:04 25 4
gpt4 key购买 nike

我正在尝试实现一个 Transformer 类

public class StreamSorterByTimeStampWithDelayTransformer < V > 
implements Transformer< Long, V, KeyValue< Long, V > >

类的构造函数为每个实例创建一个 StateStore,因此:

this.state_store_name = "state_store_" + this.hashCode();

KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder =
Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() );
stream_builder.addStateStore( key_value_store_builder );

Transformer init 方法因此引用了 StateStore:

public void init(ProcessorContext context) {
this.context = context;
this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name );
// schedule a punctuate() method every "poll_ms" (wall clock time)
this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME,
(timestamp) -> pushOutOldEnoughEntries() );
}

我想我必须遗漏一个步骤,因为当调用 getStateStore 时,它导致异常说:

Processor KSTREAM-TRANSFORM-0000000003 has no access to StateStore

我遗漏了什么或做错了什么?

最佳答案

您需要将商店附加到变压器:

stream.transform(..., this.state_store_name);

关于apache-kafka-streams - 为什么我的 Kafka Transformer 的 StateStore 无法访问?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49712540/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com