gpt4 book ai didi

java - 无法从 kafka 主题轮询/获取所有记录

转载 作者:行者123 更新时间:2023-12-01 17:59:00 26 4
gpt4 key购买 nike

我正在尝试轮询来自特定主题的数据,例如 kafka 正在接收 100 条记录/秒但大多数时候它不会获取所有记录。我使用超时为 5000 毫秒,并且每隔 100 毫秒调用此方法注意:我也订阅了特定主题

@Scheduled(fixedDelayString = "100")

    public void pollRecords() {
ConsumerRecords<String, String> records =
leadConsumer.poll("5000");

如何从kafka获取所有数据?

最佳答案

从 poll() 返回的最大记录数由 max.poll.records 消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制从服务器返回的最大数据量。 (fetch.max.bytesmax.partition.fetch.bytes)

另一方面,在代理端还有另一个大小限制,称为 message.max.bytes

因此您应该正确设置这些参数以获取更多消息。

来自 Kafka 文档 (link):

max.poll.records: The maximum number of records returned in a single call to poll(). (default: 500)

fetch.max.bytes: The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. (default:52428800)

message.max.bytes: The largest record batch size allowed by Kafka. If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that the they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config. (default: 1000012)

max.partition.fetch.bytes: The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size. (default: 1048576)

关于java - 无法从 kafka 主题轮询/获取所有记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60666407/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com