gpt4 book ai didi

apache-kafka-streams - Kafka Stream 0.10.2.0 状态存储在存储值时获取异常

转载 作者:行者123 更新时间:2023-12-04 00:16:50 24 4
gpt4 key购买 nike

我正在使用带有状态存储的低级处理器 API,直到 0.10.0.1它工作正常,但我已经升级了 kafka 流,但我收到以下错误,所以在那之后我发现这是由于更改日志并且正在查看记录上下文

java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
! at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:150)
! at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:60)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:47)
! at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.put(ChangeLoggingKeyValueStore.java:66)
! at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$2.run(MeteredKeyValueStore.java:67)
@Override
public void process(String arg0, List<Data> data {
data.forEach((x) -> {
String rawKey = x.getId();
Data data = kvStore.get(rawKey);
long bytesize = data == null ? 0 : data.getVolume();
x.addVolume(bytesize);
kvStore.put(rawKey, x);
});
}

public void start() {
builder = new KStreamBuilder();
storeSupplier = Stores.create(getKVStoreName()).withKeys(getProcessorKeySerde()).withValues(getProcessorValueSerde()).persistent().build();
builder.addStateStore(storeSupplier);
stream = builder.stream(Serdes.String(), serde(),getTopicName());
processStream(stream);
streams = new KafkaStreams(builder, props);
streams.cleanUp();
streams.start();
}

@Override
public void init(ProcessorContext context) {
super.init(context);
this.context = context;
this.context.schedule(timeinterval);
this.kvStore = (KeyValueStore) context.getStateStore(getKVStoreName());
}

最佳答案

当使用 Processor 的相同实例时,可能会出现这样的异常。跨多个流线程或分区。

确保您将新实例返回给 ProcessorSupplier :

new ProcesorSupplier(() -> new Processor(...

这同样适用于 TransformerTransformerSupplier以及。

关于apache-kafka-streams - Kafka Stream 0.10.2.0 状态存储在存储值时获取异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44044244/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com