gpt4 book ai didi

java - MapR Streams Kafka API 的批量大小问题

转载 作者:行者123 更新时间:2023-12-01 18:44:50 25 4
gpt4 key购买 nike

您好,我正在使用 Kafka MapRStream 接收来自 Mapr Streams 主题的事件。

我正在尝试增加消费者的批量大小,但我在一批中收到的消息不超过30条! p>

单个事件的大小约为 5000 字节。如果事件较小,我会在一批中获得更多。

这是我的消费者配置:

public static void main( String[] args ) {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchSize");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 26214400);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 100 * 1024 * 1024);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);


Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
long totalCount = 0;
long start = System.currentTimeMillis();
long countTimesNoMessages = 0;

while (countTimesNoMessages < 10) {
ConsumerRecords<String, String> records = consumer.poll(1000);
totalCount += records.count();
System.out.println(records.count());
if (records.count() == 0) {
countTimesNoMessages++;
}
}

long end = System.currentTimeMillis();
System.out.println((end - start) + " for " + totalCount + " messages");
}

最佳答案

这些是可能的配置点。

https://mapr.com/docs/61/MapR_Streams/configuration-parameters.html

请注意,fetch.max.bytes 是所有分区的总最大值,并且 sum(max.partition.fetch.bytes) 不能超过 fetch.max.bytes .

调整max.partition.fetch.bytes是正常的,因此每个分区轮询超过64Kb(默认),也会调整fetch.max.bytes这样它就可以让 max.partition.fetch.bytes 正常工作。

您不应该将批量大小设置得太大。一旦轮询流的请求频率下降到每秒几百左右,您就不太可能获得性能的额外改进,并且更有可能出现热点问题或在线程失败时需要大量重做工作.

关于java - MapR Streams Kafka API 的批量大小问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59858725/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com