gpt4 book ai didi

java - Kafka 配置 MAX_BYTES 仍然受到 500 条消息的限制

转载 作者:行者123 更新时间:2023-12-01 17:41:23 25 4
gpt4 key购买 nike

我的消费者配置如下:

问题是当我从测试主题(1 个分区包含 1000 条消息)轮询数据时,每次轮询仅收到 500 条消息。每条消息大约为 90 字节。这个配置绝对应该足够高来处理所有数据。有什么原因会这样吗?

使用配置

    public static KafkaConsumer<String, SpecificRecordBase> createConsumer(
Arguments args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, args.groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "4500");

// Data batching configuration
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");

// Specify the number of bytes you want to read in batch
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, args.schemaRegistryUrl);

return new KafkaConsumer<>(properties);
}

投票片段

.....
while (true) {
ConsumerRecords<String, SpecificRecordBase> records =
myConsumer.poll(Duration.ofSeconds(CONSUMER_POLL_SECONDS));
....

此处的记录数为 500

编辑:

在文档中了解到默认轮询计数为 500。我应该需要哪个配置?我并不真正关心消息的数量,我关心正在传输的字节数。

        properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500000000");

最佳答案

有一个消费者配置属性max.poll.records,您没有更改其默认值(500)。

If you are using the Java consumer, you can also adjust max.poll.records to tune the number of records that are handled on every loop iteration.

引用:Confluent Kafka Consumer Properties

我记得我遇到过类似的问题,但就我而言,问题是由字节限制之一引起的。

关于java - Kafka 配置 MAX_BYTES 仍然受到 500 条消息的限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60676552/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com