gpt4 book ai didi

java - Kafka 0.10.2 消费者获得大量重复项

转载 作者:行者123 更新时间:2023-12-02 13:25:49 25 4
gpt4 key购买 nike

我有一个相当简单的 Kafka 设置 - 1 个生产者、1 个主题、10 个分区、10 个 KafkaConsumers 都具有相同的组 ID,全部运行在一台机器上。当我处理一个文件时,生产者快速创建 3269 条消息,消费者高兴地开始消费这些消息。一切都运行良好一段时间,但在某个时刻,消费者开始消费重复项——大量重复项。事实上,看起来他们只是再次开始消耗消息队列。如果我让它运行很长时间,数据库将开始接收相同的数据条目 6 次或更多次。在对日志进行一些测试后,看起来消费者正在重新使用具有相同唯一消息名称的相同消息。

据我所知,没有发生重新平衡。消费者不会消亡或增加。同样是 10 个消费者,一遍又一遍地消费相同的 3269 条消息,直到我终止该进程。如果我放任不管,消费者将写入数十万条记录,从而大量增加真正应该进入数据库的数据量。

我对 Kafka 相当陌生,但我有点不明白为什么会发生这种情况。我知道 Kafka 不能保证一次性处理,而且我可以接受这里那里的一些重复。我有代码可以防止再次保留相同的记录。但是,我不确定为什么消费者会一遍又一遍地重新消费队列。我知道 Kafka 消息在被消费后不会被删除,但如果所有消费者都在同一个组中,那么偏移量应该可以防止这种情况,对吧?我对偏移量的工作原理有所了解,但据我所知,如果没有重新平衡,它们不应该被重置,对吧?据我所知,这些消息并没有超时。有没有办法让我的消费者一次性消费队列中的所有内容,然后等待更多消息,而无需永远重新消费相同的内容?

以下是我传递给生产者和消费者的属性:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("group.id", "MyGroup");
props.put("num.partitions", 10);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

MyIngester ingester = new MyIngester(args[0], props);

最佳答案

对我来说,这似乎是确认收据的问题。尝试以下属性

    props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "100");

关于java - Kafka 0.10.2 消费者获得大量重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43383392/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com