gpt4 book ai didi

apache-kafka - 处理kafka消息需要很长时间

转载 作者:行者123 更新时间:2023-12-02 03:05:55 24 4
gpt4 key购买 nike

我有一个 Python 进程(或者更确切地说,一组在消费者组中并行运行的进程)根据来自特定主题的 Kafka 消息形式的输入来处理数据。通常每条消息都会被快速处理,但有时,根据消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker 断开客户端与组的连接并启动重新平衡。我可以将 session_timeout_ms 设置为一个非常大的值,但它会超过 10 分钟,这意味着如果客户端死亡,集群将在 10 分钟内无法正确重新平衡。这似乎是个坏主意。此外,大多数消息(大约 98%)都很快,因此只为 1-2% 的消息支付这样的惩罚似乎是一种浪费。 OTOH,大消息的频率足以导致大量的重新平衡并消耗大量的性能(因为当组正在重新平衡时,什么都没有完成,然后“死”客户端再次重新加入并导致另一个重新平衡)。

所以,我想知道,是否还有其他方法可以处理需要很长时间才能处理的消息?有什么方法可以手动启动心跳来告诉代理“没关系,我还活着,我只是在处理消息”?我认为 Python 客户端(我使用 kafka-python 1.4.7)应该为我做这件事,但它似乎没有发生。此外,API 似乎根本没有单独的“心跳”功能。据我所知,调用 poll() 实际上会给我下一条消息——虽然我什至没有完成当前的消息,而且还会弄乱 Kafka 消费者的迭代器 API,这非常方便在 Python 中使用。

万一它很重要,Kafka 集群是 Confluent,如果我没记错的话是 2.3 版。

最佳答案

在Kafka中,0.10.1+ Kafka轮询和 session 心跳是相互解耦的。可以得到解释here

ma​​x.poll.interval.ms 在超时之前允许消费者实例完成处理的时间意味着如果处理时间超过 max.poll.interval.ms 时间消费者组将假定其从消费者组中删除并调用重新平衡。

增加它会增加预期轮询之间的间隔,从而使消费者有更多时间处理从轮询(长)返回的一批记录。但与此同时,它也会延迟组重新平衡,因为消费者只会在轮询调用中加入重新平衡。

session.timeout.ms 是用于识别消费者是否还活着并按定义的时间间隔 (heartbeat.interval.ms) 发送心跳的超时时间。一般来说,经验法则是 heartbeat.interval.ms 应该是 session 超时的 1/3,因此在网络故障的情况下,消费者最多可以错过 session 超时前的 3 次心跳。

  1. session.timeout.ms:低值有助于更快地检测到故障。

  2. max.poll.interval.ms:较大的值将降低由于处理时间增加而导致失败的风险,但会增加重新平衡时间。

注意:Consumer Group 消耗的大量分区和主题也会影响整体再平衡时间

另一种方法,如果你真的想摆脱重新平衡,你可以使用分区分配在每个消费者实例上手动分配分区。在这种情况下,每个消费者实例将独立运行,并有自己分配的分区。但在那种情况下,您将无法利用再平衡功能自动分配分区。

关于apache-kafka - 处理kafka消息需要很长时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59024271/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com