gpt4 book ai didi

java - 无法为 Spring KafkaTemplate 指定 valueSerializer 实例

转载 作者:行者123 更新时间:2023-12-02 11:15:32 24 4
gpt4 key购买 nike

我需要将自定义值序列化器设置到Spring的KafkaTemplate中。值序列化器如下所示: JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper); (例如,为了申请PropertyNamingStrategy.SNAKE_CASE)

使用 KafkaProducer 可以轻松完成:

new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);

但我还没有找到对 KafkaTemplate 执行相同操作的可能性。我只看到props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);属性 ProducerFactory ,但这不是我正在寻找的(无法提供具体实例)。

我们通过以下方式创建KafkaTemplate:

@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}

private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setDefaultTopic("topicName");
return kafkaTemplate;
}

还发现了以下方法:

kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));

但仅当我们使用 Spring 的 Message 时,它才会应用转换。如果我们使用键和值调用发送方法,则实例并忽略提供的转换。

spring-kafka版本:2.1.0.RELEASE

最佳答案

最后我发现,可以通过在构造函数中提供值序列化器来配置多个 DefaultKafkaProducerFactory 来做到这一点:

public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
Serializer<V> valueSerializer)

特定值序列化器的最终版本如下所示(配置的 ProducerFactory 数量与所需的不同值序列化器的数量相同):

@Bean
public ProducerFactory<String, Object> producerFactoryWithSnakeCaseValueSerializer(KafkaProperties kafkaProperties) {
Map<String, Object> props = producerConfigsWithSnakeCaseValueSerializer(kafkaProperties.getDefaultSettings());
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(createObjectMapper(SNAKE_CASE));
valueSerializer.setAddTypeInfo(false);
return new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
}

private static Map<String, Object> producerConfigsWithSnakeCaseValueSerializer(Map<String, String> defaultSettings) {
Map<String, Object> props = new HashMap<>(defaultSettings);
...
return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerConfigsWithSnakeCaseValueSerializer) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerConfigsWithSnakeCaseValueSerializer);
kafkaTemplate.setDefaultTopic("topicName");
return kafkaTemplate;
}

关于java - 无法为 Spring KafkaTemplate 指定 valueSerializer 实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50309915/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com