gpt4 book ai didi

Spring Kafka 不尊重 max.poll.records 的奇怪行为

转载 作者:行者123 更新时间:2023-12-03 19:09:26 24 4
gpt4 key购买 nike

好吧,我正在尝试以下场景:

  • 在 application.properties 中将 max.poll.records 设置为 50。
  • 在 application.properties 中将 enable-auto-commit=false 和 ack-mode 设置为手动。
  • 在我的方法中添加了@KafkaListener,但不要提交任何消息,只需阅读、记录但不要发出 ACK。

  • 实际上,在我的 Kafka 主题中,我有 500 条消息要使用,所以我期待以下行为:
  • Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  • 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  • 在下一次 Spring Kafka poll() 调用中,获取与步骤 1 相同的 50 条消息(偏移量 0 到 50)。据我了解,Spring Kafka 应该继续在此循环中(步骤 1-3)读取始终相同的消息。

  • 但是会发生以下情况:
  • Spring Kafka poll() 50 条消息(偏移量 0 到 50)。
  • 正如我所说,我没有提交任何内容,只是记录了 50 条消息。
  • 在下一个 Spring Kafka poll() 调用中,获取 NEXT 50 条消息,不同于步骤 1 (偏移 50 到 100)。

  • Spring Kafka 以 50 条消息为单位读取 500 条消息,但不提交任何内容。如果我关闭应用程序并重新启动,则会再次收到 500 条消息。
    所以,我的疑惑:
  • 如果我将 max.poll.recors 配置为 50,如果我没有提交任何内容,spring Kafka 如何获取接下来的 50 条记录?我知道 poll() 方法应该返回相同的记录。
  • Spring Kafka 有缓存吗?如果是的话,如果我在没有提交的情况下在缓存中获得 100 万条记录,这可能是个问题。
  • 最佳答案

    您的第一个问题:

    If I configured the max.poll.recors to 50, how spring Kafka get thenext 50 records if I didn't commit anything? I understand the poll()method should return the same records.


    第一 ,为了确保你没有提交任何东西,你必须确保你理解以下 3 个参数,我相信你已经理解了。
  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,将其设置为 false(这也是推荐的默认值)。如果设置为 false,请注意 auto.commit.interval.ms变得无关紧要。查看 this文档:

  • Because the listener container has it’s own mechanism for committingoffsets, it prefers the Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGto be false. Starting with version 2.3, it unconditionally sets it tofalse unless specifically set in the consumer factory or thecontainer’s consumer property overrides.


  • factory.getContainerProperties().setAckMode(AckMode.MANUAL) ;您有责任承认。 (在使用事务时忽略)和 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG不能是 true .
  • factory.getContainerProperties().setSyncCommits(true/false);设置是否调用consumer.commitSync()commitAsync() 当容器负责提交时。 默认为真。这负责与 Kafka 同步,没有别的,如果设置为 true,则该调用将阻塞,直到 Kafka 响应。

  • 其次 ,没有消费者 poll() 不会返回相同的记录。对于 当前运行的消费者 ,它使用一些内部索引跟踪它在内存中的偏移量,我们不必关心提交偏移量。另请参阅@GaryRussell 的解释 here .
    简而言之,他解释说:

    Once the records have been returned by the poll (and offsets notcommitted), they won't be returned again unless you restart theconsumer or perform seek() operations on the consumer to reset theoffset to the unprocessed ones.



    您的第二个问题:

    Does Spring Kafka have some cache? If yes, this can be a problem if Iget 1million records in cache without commit.


    没有“缓存”,都是关于偏移量和提交的,解释如上。

    现在要实现您想要做的事情,您可以考虑在获取前 50 条记录后做 2 件事,即下一个 poll():
  • 要么,以编程方式重新启动容器
  • 或调用consumer.seek(partition, offset);

  • 奖金:
    无论您选择何种配置,您都可以通过查看 LAG 随时查看结果。此输出的列:
    kafka-consumer-groups.bat --bootstrap-server localhost:9091 --describe --group your_group_name

    关于Spring Kafka 不尊重 max.poll.records 的奇怪行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62583046/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com