gpt4 book ai didi

apache-kafka - Spring kafka 和 Kafka 集群

转载 作者:行者123 更新时间:2023-12-04 04:24:28 30 4
gpt4 key购买 nike

我已经在集群中配置了 3 个 kafka,我正在尝试与 spring-kafka 一起使用。

但是在我杀死 kafka 领导者之后,我无法将其他消息发送到队列中。

我将 spring.kafka.bootstrap-servers 属性设置为:“kafka-1:9092;kafka-2:9093,kafka-3:9094”以及我的主机文件中的所有名称。

卡夫卡版本 0.10

有人知道如何正确配置吗?

编辑

我测试了一件事,发生了奇怪的行为。
当我启动服务时,我向主题发送一条消息(强制创建)

代码:

@Bean
public KafkaSyncListener synchronousListener(MessageSender sender, KafkaProperties prop) {
sender.send(prop.getSynchronousTopic(), "Message to force create the topic! Run, Forrest, Run!");
return new KafkaSyncListener();
}

所以,这次我没有启动 kafka-1 服务器(只是其他服务器)并且发生了异常:

org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.



似乎 spring-kafka 只是尝试在第一个引导服务器上连接。
我正在使用 spring-kafka 1.3.5.RELEASE 和 kafka 0.10.1.1

编辑 2

我已经做了你做的测试。当我删除领导者已更改的第一个 docker 容器 (kafka-1) 时,也会发生同样的情况。所以,我的消费者( Spring 服务)无法消费消息。
但是当我再次启动 kafka-1 时,该服务会收到所有消息
我的消费者 ConcurrentKafkaListenerContainerFactory:
{
key.deserializer=class
org.apache.kafka.common.serialization.IntegerDeserializer,
value.deserializer=class
org.apache.kafka.common.serialization.StringDeserializer,
max.poll.records=500,
group.id=mongo-adapter-service,
ssl.keystore.location=/certs/kafka.keystore.jks,
bootstrap.servers=[kafka-2:9093, kafka-1:9092, kafka-3:9094],
auto.commit.interval.ms=100,
security.protocol=SSL,
max.request.size=5242880,
ssl.truststore.location=/certs/kafka.keystore.jks,
auto.offset.reset=earliest
}

最佳答案

服务器地址之间需要逗号,而不是分号。

编辑

我刚刚运行了一个没有问题的测试:

spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094


@SpringBootApplication
public class So50804678Application {

public static void main(String[] args) {
SpringApplication.run(So50804678Application.class, args);
}

@KafkaListener(id = "foo", topics = "so50804678")
public void in(String in) {
System.out.println(in);
}

@Bean
public NewTopic topic() {
return new NewTopic("so50804678", 1, (short) 3);
}

}


$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: so50804678 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

杀了首领,然后
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678
Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: so50804678 Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2


$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678

发送了一条消息,并被应用程序接收到;除了警告之外,日志中没有错误:

[Consumer clientId=consumer-1, groupId=foo] Connection to node 0 could not be established. Broker may not be available.



然后我重新启动了死服务器;停止了我的应用程序;然后添加此代码...
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
while(true) {
System.out.println(template.send("so50804678", "foo").get().getRecordMetadata());
Thread.sleep(3_000);
}
};
}

同样,杀死现任领导人没有任何影响;一切恢复正常。

您可能需要调整服务器 Prop 中的 listeners/advertised.listeners 属性。由于我的经纪人都在本地主机上,我将它们保留为默认值。

关于apache-kafka - Spring kafka 和 Kafka 集群,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50804678/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com