gpt4 book ai didi

java - 如何为 Kafka KeyValueStore 设置值解/序列化器?

转载 作者:行者123 更新时间:2023-11-30 05:34:46 26 4
gpt4 key购买 nike

我将来自 Kafka 主题的消息存储在 KeyValueStore 中,以便稍后查询它们。我创建一个 KTable 如下:

@StreamListener
public void process(@Input("input") KTable<String,MyMessage> myMessages) {

我在 application.yml 中配置了消费者,如下所示:

更新的反/序列化程序包

spring.cloud.stream.kafka.streams.bindings.input:
consumer:
materializedAs: all-messages
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.me.MyMessageDeserializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.me.MyMessageSerializer

但是,当我从 KeyValueStore 读取时,键会作为字符串正确返回,但返回的值是字节数组而不是 MyMessage。由于某种原因,我的自定义解串器没有被使用。我尝试自己反序列化该消息,但我的反序列化器因异常而崩溃。我在序列化器上放置了一个断点,但它从未被调用。我很清楚,我的串行器或解串器都没有被使用。

我缺少什么配置才能使用我的自定义值解/序列化器?反序列化器是否需要位于特定的包中才能找到?

最佳答案

application.yml 中使用了错误的配置键。而不是 key-deserializer:它应该是 keySerde:而不是 value-deserializer:它应该是 valueSerde。下面是正确的配置:

spring.cloud.stream.kafka.streams.bindings.input:
消费者:
物化为:所有消息
keySerde:org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde:com.me.MyMessageSerde
制片人:
keySerde:org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde:com.me.MyMessageSerde

关于java - 如何为 Kafka KeyValueStore 设置值解/序列化器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56865774/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com