gpt4 book ai didi

java - 多个主题的 Spring Kafka 单一生产者

转载 作者:行者123 更新时间:2023-11-30 06:05:38 26 4
gpt4 key购买 nike

我想使用单个生产者将 JSON 对象写入多个主题。

下面的代码正在做我想做的事情,但是使用 setDefaultTopic() 方法告诉 KafkaTemplate 它应该将消息发送到哪个主题感觉不对。

如果我使用 send(String topic, ? payload) 方法,StringJsonMessageConverter 将不起作用。

我的制作人:

public class MyProducer {

@Autowired
private KafkaTemplate<String, ?> kafka;

public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}

还有我的配置:

@Configuration
public class MyProducerConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...

return new DefaultKafkaProducerFactory<>(props);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());

return kafkaTemplate;
}
}

关于如何正确执行此操作的任何建议?

更新

我把代码改成了这个...

制作人:

public void send(Message<?> message) {
kafka.send(message);
}

在我的 Controller 中(我创建消息对象的地方);

MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);

MessageHeaders 对象仍将包含 ID 和时间戳。

最佳答案

使用 send(Message<?>) 时变体,消息转换器希望主题位于消息 header 中...

String topic = headers.get(KafkaHeaders.TOPIC, String.class);

如果您可以通过其他方式确定主题,则需要创建自定义转换器。

更改默认主题不是线程安全的。

关于java - 多个主题的 Spring Kafka 单一生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46069762/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com