gpt4 book ai didi

apache-kafka - Spring Kafka 消费者无法消费记录

转载 作者:行者123 更新时间:2023-12-04 01:41:59 47 4
gpt4 key购买 nike

我们正在使用 Spring Kafka 来批量消费记录。我们有时会遇到这样的问题:应用程序启动后,即使有足够多的未读消息,它也不会消耗任何记录。相反,我们不断看到信息日志说。

[INFO]-[FetchSessionHandler:handleError:440] - [Consumer clientId=consumer-2, groupId=groupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1027: org.apache.kafka.common.errors.DisconnectException. 

人们正面临这个问题,每个人都说忽略它,因为它只是一个信息日志。甚至,我们看到一段时间后应用程序开始拾取记录而不做任何事情。但是,开始使用记录可能需要多长时间是非常不可预测的:(

我们在使用Spring cloud stream的时候没有看到这个错误。不确定我们是否遗漏了 spring-kafka 中的任何配置。

过去有人遇到过这个问题,如果我们遗漏了什么,请告诉我们。 我们的主题负载很大,如果有很多滞后,会发生这种情况吗?

我们正在使用 2.2.2.RELEASE 的 Spring KafkaSpring Boot 2.1.2.RELEASEKafka 0.10.0.1(我们知道它很旧,因为不可避免的原因我们不得不使用它:()

这是我们的代码:

应用程序.yml

li.topics: CUSTOM.TOPIC.JSON
spring:
application:
name: DataPublisher
kafka:
listener:
type: batch
ack-mode: manual_immediate
consumer:
enable-auto-commit: false
max-poll-records: 500
fetch-min-size: 1
fetch-max-wait: 1000
group-id: group-dev-02
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:CustomResourceDeserialiser
auto-offset-reset: earliest

消费者:

public class CustomKafkaBatchConsumer {


@KafkaListener(topics = "#{'${li.topics}'.split(',')}", id = "${spring.kafka.consumer.group-id}")
public void receiveData(@Payload List<CustomResource> customResources,
Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
}
}

反序列化器:

public class CustomResourceDeserialiser implements Deserializer<CustomResource> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public CustomResource deserialize(String topic, byte[] data) {
if (data != null) {
try {
ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
return objectMapper.readValue(data, CustomResource.class);
} catch (IOException e) {
log.error("Failed to deserialise with {}",e.getMessage());
}
}
return null;
}

@Override
public void close() {

}
}

最佳答案

这可能是因为这个 Kafka-8052 - Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request问题。这是在 Kafka 2.3.0 中修复的

不幸的是,截至 2019 年 8 月 21 日,Spring Cloud Streams 尚未使用 2.3.0 版本的 kafka-clients 升级它的依赖项。

您可以尝试将这些作为显式 依赖项添加到您的 gradle 中

    compile ('org.apache.kafka:kafka-streams:2.3.0')
compile ('org.apache.kafka:kafka-clients:2.3.0')
compile ('org.apache.kafka:connect-json:2.3.0')
compile ('org.apache.kafka:connect-api:2.3.0')

更新

这也可能是kafka Broker - 客户端不兼容导致的。如果您的集群落后于客户端版本,您可能会看到诸如此类的各种奇怪问题。例如,假设您的 kafka 代理在 1.x.x 上,而您的 kafka-consumer 在 2.x.x 上,这可能会发生

关于apache-kafka - Spring Kafka 消费者无法消费记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56988166/

47 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com