gpt4 book ai didi

apache-kafka - spring-kafka 请求回复 : Different Types for Request and Reply

转载 作者:行者123 更新时间:2023-12-03 11:10:30 31 4
gpt4 key购买 nike

提供 Request-Reply 支持的 ReplyingKafkaTemplate 文档(在 Spring-Kafka 2.1.3 中引入)表明请求和回复可以使用不同的类型:

ReplyingKafkaTemplate<K, V, R>

其中参数化类型 K 指定消息键,V 指定值(即请求),R 指定回复。

到目前为止还不错。但是用于实现服务器端 Request-Reply 的相应支持类似乎不支持 V、R 的不同类型。文档建议使用带有添加的 @SendTo 注释的 KafkaListener,它在幕后使用 MessageListenerContainer 上配置的 replyTemplate .但是 AbstractKafkaListenerEndpoint 只支持单一类型的监听器以及 replyTemplate:

public abstract class AbstractKafkaListenerEndpoint<K, V>
implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {

...

/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the template.
* @since 2.0
*/
public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
this.replyTemplate = replyTemplate;
}

...

}

因此 V 和 R 需要是同一类型。

文档中使用的示例确实对请求和回复都使用了字符串。

我是否遗漏了什么,或者这是应该报告和更正的 Spring-Kafka 请求-回复支持中的设计缺陷?

最佳答案

这是 fixed in the 2.2 release .

对于早期版本,只需注入(inject)一个原始的KafkaTemplate(没有泛型)。

编辑

@SpringBootApplication
public class So53151961Application {

public static void main(String[] args) {
SpringApplication.run(So53151961Application.class, args);
}

@KafkaListener(id = "so53151961", topics = "so53151961")
@SendTo
public Bar handle(Foo foo) {
System.out.println(foo);
return new Bar(foo.getValue().toUpperCase());
}

@Bean
public ReplyingKafkaTemplate<String, Foo, Bar> replyingTemplate(ProducerFactory<String, Foo> pf,
ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {

ConcurrentMessageListenerContainer<String, Bar> replyContainer =
factory.createContainer("so53151961-replyTopic");
replyContainer.getContainerProperties().setGroupId("so53151961.reply");
ReplyingKafkaTemplate<String, Foo, Bar> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);
return replyingKafkaTemplate;
}

@Bean
public KafkaTemplate<String, Bar> replyTemplate(ProducerFactory<String, Bar> pf,
ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {

KafkaTemplate<String, Bar> kafkaTemplate = new KafkaTemplate<>(pf);
factory.setReplyTemplate(kafkaTemplate);
return kafkaTemplate;
}

@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Bar> template) {
return args -> {
ProducerRecord<String, Foo> record = new ProducerRecord<>("so53151961", null, "key", new Foo("foo"));
RequestReplyFuture<String, Foo, Bar> future = template.sendAndReceive(record);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
};
}

@Bean
public NewTopic topic() {
return new NewTopic("so53151961", 1, (short) 1);
}

@Bean
public NewTopic reply() {
return new NewTopic("so53151961-replyTopic", 1, (short) 1);
}

public static class Foo {

public String value;

public Foo() {
super();
}

public Foo(String value) {
this.value = value;
}

public String getValue() {
return this.value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public String toString() {
return "Foo [value=" + this.value + "]";
}

}

public static class Bar {

public String value;

public Bar() {
super();
}

public Bar(String value) {
this.value = value;
}

public String getValue() {
return this.value;
}

public void setValue(String value) {
this.value = value;
}

@Override
public String toString() {
return "Bar [value=" + this.value + "]";
}

}

}
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example

结果

Foo [value=foo]
Bar [value=FOO]

关于apache-kafka - spring-kafka 请求回复 : Different Types for Request and Reply,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53151961/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com