gpt4 book ai didi

apache-kafka - fetch.max.wait.ms 与 poll() 方法的参数

转载 作者:行者123 更新时间:2023-12-03 16:12:28 26 4
gpt4 key购买 nike

在我问我的问题之前,我想指出有人问过类似的问题 here但是还没有人回答,所以我再问一次。请不要将此标记为重复,因为前面提到的问题没有任何答案。
我对 fetch.max.wait.ms 有疑问和 consumer.poll(<value>) .
这是我在对上述配置的研究中发现的

The poll() method takes a timeout parameter. This specifies how long it will take poll to return, with or without data

If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB, Kafka will recieve a fetch request from the consumer and will respond with data either when it has 1 MB of data to return or after 100 ms, whichever happens first.


所以我的问题是,当 fetch.max.wait.ms=500 时会发生什么? , consumer.poll(200)fetch.min.bytes= 500但经纪人没有足够的数据返回 fetch.min.bytes ?

最佳答案

fetch.min.bytes

This property allows a consumer to specify the minimum amount of datathat it wants to receive from the broker when fetching records. If abroker receives a request for records from a consumer but the newrecords amount to fewer bytes than fetch.min.bytes, the broker willwait until more messages are available before sending the records backto the consumer.


fetch.max.wait.ms

It Will inform the broker to wait until it has enough data to send beforeresponding to the consumer.

Example: If you set fetch.max.wait.ms to 100 ms and fetch.min.bytes to 1 MB,Kafka will receive a fetch request from the consumer and will respondwith data either when it has 1 MB of data to return or after 100 ms,whichever happens first.


以上两个参数控制代理同时向消费者响应消息。
轮询(超时)

Basically poll() controls how long poll() will block if data is not available in the broker to consume.


在消费者端请求轮询以获取 Broker 响应的记录。它调用 fetchrecords() 并且如果代理中已经存在满足上述参数 fetch.min.bytes 和 fetch.max.wait.ms 的记录,它将立即响应,否则等待给定的超时返回空,以防代理中没有可用的记录。
下面解释了 KafkaConsumer 类中的 pollForfetches 方法
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
final long startMs = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);

// if data is available already, return it immediately
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}

// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();

// We do not want to be stuck blocking in the poll if we are missing some positions
// since the offset lookup may be backing off after a failure

// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}

client.poll(pollTimeout, startMs, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}

return fetcher.fetchedRecords();
}
如果 fetch.min.bytes= 500 和 fetch.max.wait.ms=500 这意味着代理将在有 500 字节数据要返回时或在 500 毫秒后响应消费者,以先发生者为准。
消费者侧轮询将每 200 毫秒调用一次 fetchedRecords 以接收代理提供的任何消息。

关于apache-kafka - fetch.max.wait.ms 与 poll() 方法的参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58697750/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com