gpt4 book ai didi

java - kafka消费者尝试连接到随机主机名而不是正确的主机名

转载 作者:太空宇宙 更新时间:2023-11-04 10:20:31 26 4
gpt4 key购买 nike

我是 Kafka 新手,开始探索示例程序。它曾经工作没有任何问题,但突然consumer.poll()命令挂起并且永远不会返回。建议谷歌搜索检查服务器是否可访问。生产者和消费者java代码在同一台机器上运行,生产者能够将记录发布到Kafka,但消费者轮询方法挂起。

环境:
卡夫卡版本:1.1.0
客户端:Java
在 Windows 内的 Ubuntu docker 容器中运行
Zookeeper 和 2 个 Broker 服务器在同一个容器中运行

当我为客户端代码启用日志记录时,我看到以下异常:

2018-07-06 21:24:18 DEBUG NetworkClient:802 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Error connecting to node 4bdce773eb74:9095 (id: 2 rack: null)
java.io.IOException: Can't resolve address: 4bdce773eb74:9095
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
.................
.................

我不确定为什么消费者尝试连接到 4bdce773eb74,即使我的代理服务器是 192.168.99.100:9094,192.168.99.100:9095。我的完整消费者代码:

        final String BOOTSTRAP_SERVERS = "192.168.99.100:9094,192.168.99.100:9095";
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Event_Consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
TopicPartition tpLogin = new TopicPartition("login1", 0);
TopicPartition tpLogout = new TopicPartition("logout1", 1);
List<TopicPartition> tps = Arrays.asList(tpLogin, tpLogout);
consumer.assign(tps);
while (true) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
continue;
}
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(),
record.partition(), record.offset());
});

consumer.commitAsync();
Thread.sleep(5000);
}
}

请帮助解决此问题。

编辑正如我之前所说,我有 2 个经纪人,即经纪人 1 和经纪人 2。如果我停止broker-1,则不会记录上述异常,但 poll() 方法仍然没有返回。如果我停止 Broker-1,则会无限期记录以下消息:

2018-07-07 11:31:24 DEBUG AbstractCoordinator:579 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Sending FindCoordinator request to broker 192.168.99.100:9094 (id: 1 rack: null)
2018-07-07 11:31:24 DEBUG AbstractCoordinator:590 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Received FindCoordinator response ClientResponse(receivedTimeMs=1530943284196, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=1, clientId=consumer-1, correlationId=573), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
2018-07-07 11:31:24 DEBUG AbstractCoordinator:613 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Group coordinator lookup failed: The coordinator is not available.
2018-07-07 11:31:24 DEBUG AbstractCoordinator:227 - [Consumer clientId=consumer-1, groupId=IDCS_Audit_Event_Consumer] Coordinator discovery failed, refreshing metadata

提前致谢,索曼

最佳答案

我发现了这个问题。当我创建主题时,broker-0(在端口:9093 上运行;经纪人 id:0)和broker-2(在端口:9094 上运行;经纪人 id:2)正在运行。今天我错误地启动了broker-1(运行在端口:9095;broker id:1)和broker-2。停止broker-1并启动broker-0后,问题得到解决。现在消费者能够获取事件了。

这绝对是我这边的人为错误,但我有两条评论:

  1. 我认为Kafka应该优雅地使用broker-2(端口号:9094)并忽略broker-1(端口号:9095)
  2. 为什么 Kafka 尝试联系 4bdce773eb74:9095,而不是正确的 IP 地址(192.168.99.100)?

谢谢。

关于java - kafka消费者尝试连接到随机主机名而不是正确的主机名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51214674/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com