gpt4 book ai didi

java - kafka生产者/消费者重启后consumer收不到消息

转载 作者:行者123 更新时间:2023-12-02 12:16:05 25 4
gpt4 key购买 nike

我们有一个生产者、一个消费者和一个分区。消费者/生产者都是 Spring Boot 应用程序。消费者应用程序在我的本地计算机上运行,​​而生产者应用程序以及 kafka 和 Zookeeper 在远程计算机上运行。

在开发过程中,我重新部署了生产者应用程序并进行了一些更改。但此后我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题是什么和/或如何解决?

消费者配置:

spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
server:
port: 0

生产者配置:

cloud:
stream:
defaultBinder: kafka
bindings:
output:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092

编辑2:

5 分钟后,消费者应用程序因以下异常而终止:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel

最佳答案

看看上面关于 DEBUG 的建议是否揭示了任何进一步的信息。看起来您从 KafkaTopicProvisioner 收到了一些超时异常。但我认为当您重新启动消费者时就会发生这种情况。看起来消费者在与经纪人沟通时遇到了一些困难,您需要找出那里发生了什么。

关于java - kafka生产者/消费者重启后consumer收不到消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46177081/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com