gpt4 book ai didi

spring-boot - Spring Reactive kafka Receiver 总是将引导服务器覆盖到本地主机

转载 作者:行者123 更新时间:2023-12-04 16:00:07 24 4
gpt4 key购买 nike

我正在尝试使用 @EnableKafka 和 @KafkaListener 注释为 Reactive Kafka 消费者编写 Spring 启动应用程序。
我已经将我的 kafka 代理配置在不同的机器上。当我将引导服务器提供给 kafka 代理的广告主机时,它总是将广告主机 ip 地址覆盖到本地主机。下面是我的代码。

pom.xml 文件:-

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>

配置:-
@Configuration
@EnableKafka
public class AppConfig {

@Bean
Map consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.12.12.24:9092,192.14.14.28:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}

@Bean
ReceiverOptions receiverOptions() {

ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps()).subscription(Arrays.asList("hellochange"));

return receiverOptions;
}

@Bean
public KafkaReceiver kafkaReceiver() {
return KafkaReceiver.create(receiverOptions());
}

}

消费者:-
@Service
public class ChangeListener {

@Autowired
KafkaReceiver kafkaReceiver;

@KafkaListener(topics="hellochange",groupId="example-group")
public void receiver() {
kafkaReceiver.receive().subscribe(System.out::println);
}

}

安慰:-
 auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
2018-06-07 19:59:17.640 WARN 23536 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-1, groupId=example-group] Connection to node -1 could not be established. Broker may not be available.

我已经在 simple consumer 中验证了没有 Spring 配置的情况和非响应式(Reactive) Spring kafka,对于两者来说,它都运行良好。只有带有 EnableKafka 和 KafkaListener 注释的 Reactor kafka 我遇到了这个问题。

我错过了什么/在这里做错了吗?
我们可以在 Spring Boot 中将 EnableKafka 和 KafkaListener 注释与 Reactor Kafka 一起使用吗?

附言我明白了, @EnableKafka@KafkaListener没有反应,如果我删除 spring-kafka从 pom.xml 中,两个注释都不可用。

喜欢 @EnableKafka@KafkaListener对于非响应式(Reactive) kafka,是否有任何注释可用于使用 Spring Boot 应用程序配置响应式(Reactive) kafka 使用者?

最佳答案

你不能在 Reactor Kafka 中使用 KafkaListener 注释; @KafkaListener不是 react 性的。

关于spring-boot - Spring Reactive kafka Receiver 总是将引导服务器覆盖到本地主机,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50748770/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com