作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在我们的代码中,我们计划手动提交偏移量。我们对数据的处理是长期的,因此我们遵循之前建议的模式
while (true) {
ConsumerRecords<String, String> records = consumer.poll(kafkaConfig.getTopicPolling());
if (!records.isEmpty()) {
task = pool.submit(new ProcessorTask(processor, createRecordsList(records)));
}
if (shouldPause(task)) {
consumer.pause(listener.getPartitions());
}
if (isDoneProcessing(task)) {
consumer.commitSync();
consumer.resume(listener.getPartitions());
}
}
最佳答案
如果您调用 consumer.commitSync()
如果没有参数,它应该提交您的消费者收到的最新偏移量。由于您可以在一个 poll()
中接收许多消息您可能希望更好地控制提交并明确提交特定的偏移量,例如您的使用者已成功处理的最新消息。这可以通过调用 commitSync(Map<TopicPartition,OffsetAndMetadata> offsets)
来完成。
您可以在 Consumer Javadoc http://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync() 中查看调用 commitSync 的两种方法的语法。
关于apache-kafka - Apache 卡夫卡 : commitSync after pause,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45737393/
引自 https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callo
在我们的代码中,我们计划手动提交偏移量。我们对数据的处理是长期的,因此我们遵循之前建议的模式 阅读记录 在自己的线程中处理记录 暂停消费者 继续轮询暂停的消费者以使其活着 处理记录后,提交偏移量 提交
我是一名优秀的程序员,十分优秀!