gpt4 book ai didi

java - 强制 kafka 消费者轮询延迟最高的分区

转载 作者:行者123 更新时间:2023-12-04 11:45:36 24 4
gpt4 key购买 nike

我有一个设置,其中有几个 KafkaConsumers每个处理单个主题的多个分区。它们被静态分配了分区,以确保每个消费者都有相同数量的分区来处理。还选择了记录键,以便我们在所有分区上平均分配消息。

在负载很重的时候,我们经常看到少数分区建立了相当大的延迟(数千条消息/几分钟的值(value)),而其他获得相同负载并被同一消费者消耗的分区设法保持延迟低至几百条消息/几秒。

看起来消费者正在以最快的速度获取记录,绕过大多数分区,但有时会有一个分区被遗漏了很长时间。理想情况下,我希望延迟在各个分区中分布得更均匀。

我一直在阅读 KafkaConsumer轮询行为和配置已经有一段时间了,到目前为止,我认为有两个选项可以解决这个问题:

  • 构建可以监控每个分区滞后的自定义内容,并使用 KafkaConsumer.pause().resume()基本上强制 KafkaConsumer从滞后最大的分区中读取
  • 限制我们的 KafkaConsumer只订阅一个 TopicPartition ,并处理 KafkaConsumer 的多个实例.

  • 这些选项似乎都不是处理此问题的正确方法。配置似乎也没有答案:
  • max.partition.fetch.bytes仅指定单个分区的最大提取大小,不保证下一次提取将来自另一个分区。
  • max.poll.interval.ms仅适用于消费者组,而不适用于每个分区。

  • 我是否错过了鼓励 KafkaConsumer 的方法更频繁地切换分区?或者一种对具有最高滞后的分区实现偏好的方法?

    最佳答案

    不确定答案是否仍然与您相关,或者我的答案是否完全满足您的需求,但是,您可以尝试使用滞后意识分配器。为消费者分配分区的分配器确保为消费者分配分区,以便均匀/平等地分配消费者之间的滞后。这是我使用它编写的编写良好的代码,它实现了基于滞后的分配器。
    https://github.com/grantneale/kafka-lag-based-assignor
    您需要做的就是配置您的消费者以使用此分配器。以下声明。

    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, LagBasedPartitionAssignor.class.getName());

    关于java - 强制 kafka 消费者轮询延迟最高的分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59499986/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com