gpt4 book ai didi

apache-kafka - Kafka session.timeout.ms 和 max.poll.interval.ms 的区别

转载 作者:行者123 更新时间:2023-12-04 10:38:22 25 4
gpt4 key购买 nike

AFAIK,max.poll.interval.ms 是在 Kafka 0.10.1 中引入的。然而目前还不清楚什么时候我们可以同时使用 session.timeout.ms 和 max.poll.interval.ms
考虑心跳线程没有响应的用例,但是我的处理线程因为它设置了更高的值,它仍然在处理记录。但是当心跳线程关闭后,在跨越 session.timeout.ms 之后,究竟会发生什么。因为我在 POC 中观察到消费者重新平衡直到达到 max.poll.interval.ms 才会发生。
所以对我来说 session.timeout.ms 似乎是多余的。
类似 question已发布,但它没有回答这个问题。

最佳答案

session.timeout.ms用于通过心跳机制检测消费者故障。消费者心跳线程必须在session.timeout.ms之前向broker发送心跳。时间到期。否则消费者被卡夫卡视为死亡并触发重新平衡。

heartbeat.interval.ms: The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group.

session.timeout.ms: The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance.



轮询是检查消费者健康的另一种机制。消费者应该在不过期的情况下调用 poll() 方法 max.poll.interval.ms .如果此时间到期(通常长时间运行的进程会导致此问题),则再次将消费者视为死亡并触发重新平衡。

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.



另一个重要的一点是(从 0.10.1.0 版本开始):
rebalance.timeout = max.poll.interval.ms

Since we give the client as much as max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case. We therefore propose to set the rebalance timeout in the Java client to the same value configured with max.poll.interval.ms. When a rebalance begins, the background thread will continue sending heartbeats. The consumer will not rejoin the group until processing completes and the user calls poll(). From the coordinator's perspective, the consumer will not be removed from the group until either 1) their session timeout expires without receiving a heartbeat, or 2) the rebalance timeout expires.



所以在你的情况下,如果 session.timeout.ms消费者在没有心跳的情况下到期,然后在此消费者组中开始重新平衡。重新平衡开始后,消费者组中的所有消费者都被撤销,Kafka 等待所有仍在向 poll() 发送心跳的消费者(通过轮询消费者在那时发送 joinGroupRequest),直到重新平衡超时到期,即 max.poll.interval.ms .

在重新平衡期间,您仍然可以处理您已经拥有但无法提交并获取的消息 CommitFailedException 这条消息:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.



更多信息可以查看 this .

关于apache-kafka - Kafka session.timeout.ms 和 max.poll.interval.ms 的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60051936/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com