gpt4 book ai didi

java - kafka消费者收到一条之前消费过的消息

转载 作者:行者123 更新时间:2023-12-02 01:25:36 26 4
gpt4 key购买 nike

我在 XD 中有一个消费者作业,一旦收到其他生产者作业生成的消息,该作业就会完成。我每天都会触发这些工作。我发现有时这个消费者收到了一条之前消费过的消息。

日志如下:

####OK
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1561658772877, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-28 02:06:13+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is message_from_producer, task startTime is 1561658700108, timestamp is 1561658772877 ==================

####NG
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 17:07:14+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is message_from_producer, task startTime is 1561799100282, timestamp is 1561399136840 ==================

####OK
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 2, offset = 5, CreateTime = 1561817817702, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-06-29 22:16:58+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is message_from_producer, task startTime is 1561817528447, timestamp is 1561817817702 ==================

####NG
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========consumed poll data ConsumerRecord(topic = my_consumer_topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1561399136840, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_from_producer) ==================
2019-07-02 02:05:09+0800 INFO inbound.job:Consumer_Job_In_XD-redis:queue-inbound-channel-adapter1 myConsumer.ConsumeTasklet - ==========message is message_from_producer, task startTime is 1562004300372, timestamp is 1561399136840 ==================

看起来它多次收到 offset = 0 消息。

卡卡法版本(1.0.0)

消费者手动提交偏移量。(consumer.commitSync();)
仅设置以下属性:

bootstrap.servers  
auto.offset.reset=earliest
group.id
client.id
    Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put("auto.offset.reset", "earliest");
config.put("group.id", group);
config.put("client.id", config.getProperty("group.id") + "_" + System.currentTimeMillis());
config.put("enable.auto.commit", false);
try {
consumer = new KafkaConsumer<>(config);
consumer.subscribe(tList);
while (true) {
ConsumerRecords<?, ?> records = consumer.poll(10000);
for (ConsumerRecord<?, ?> record : records) {
//.........
consumer.commitSync();
}
if (matched)
break;
}
} finally {
consumer.close();
}

最佳答案

在 Kafka 1.1 中,默认情况下,偏移量仅保留 24 小时,因为 offsets.retention.mines 设置为 1440。

因此,如果您停止消费者超过 24 小时,重新启动后,提交的偏移量可能会被删除,迫使消费者使用 auto.offset.reset 来查找新位置。

由于这对很多人来说太短了,从 Kafka 2.0 开始,offsets.retention.months 现在设置为 10080(7 天)。

您应该更改代理配置,以允许更长时间地保留偏移量或更新到更新的 Kafka 版本。

关于java - kafka消费者收到一条之前消费过的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56985367/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com