gpt4 book ai didi

apache-kafka - Apache 卡夫卡 : commitSync after pause

转载 作者:行者123 更新时间:2023-12-04 17:50:06 25 4
gpt4 key购买 nike

在我们的代码中,我们计划手动提交偏移量。我们对数据的处理是长期的,因此我们遵循之前建议的模式

  • 阅读记录
  • 在自己的线程中处理记录
  • 暂停消费者
  • 继续轮询暂停的消费者以使其活着
  • 处理记录后,提交偏移量
  • 提交完成后,恢复消费者

  • 代码有点像这样:
       while (true) { 
    ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getTopicPolling());
    if (!records.isEmpty()) {
    task = pool.submit(new ProcessorTask(processor, createRecordsList(records)));
    }
    if (shouldPause(task)) {
    consumer.pause(listener.getPartitions());
    }
    if (isDoneProcessing(task)) {
    consumer.commitSync();
    consumer.resume(listener.getPartitions());
    }
    }

    如果您注意到,我们使用 commitSync()(不带任何参数)提交。
    由于消费者已暂停,因此在下一次迭代中我们将不会获得任何记录。但是 commitSync() 会稍后发生。在那种情况下,它会尝试提交哪个偏移量?我已经阅读了权威指南并用谷歌搜索,但找不到任何有关它的信息。

    我认为我们应该明确保存偏移量。但我不确定当前的代码是否会成为问题。

    任何信息都有帮助。

    谢谢,
    普拉提克

    最佳答案

    如果您调用 consumer.commitSync()如果没有参数,它应该提交您的消费者收到的最新偏移量。由于您可以在一个 poll() 中接收许多消息您可能希望更好地控制提交并明确提交特定的偏移量,例如您的使用者已成功处理的最新消息。这可以通过调用 commitSync(Map<TopicPartition,OffsetAndMetadata> offsets) 来完成。

    您可以在 Consumer Javadoc http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync() 中查看调用 commitSync 的两种方法的语法。

    关于apache-kafka - Apache 卡夫卡 : commitSync after pause,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45737393/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com