- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
The drawback is that while commitSync() will retry the commit until it either succeeds or encounters a non-retriable failure, commitAsync() will not retry.
这句话我不是很清楚。我想消费者向经纪人发送提交请求,如果经纪人在一段时间内没有响应,则意味着提交失败。我错了吗?
能否详细说明commitSync
和commitAsync
的区别?
另外,请提供我更喜欢哪种提交类型的用例。
最佳答案
正如 API 文档中所说:
This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).
也就是说,commitSync
是一种阻塞方法。调用它会阻塞您的线程,直到它成功或失败。
例如,
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
对于 for 循环中的每次迭代,只有在 consumer.commitSync()
成功返回或因抛出异常而中断后,您的代码才会移至下一次迭代。
This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.
也就是说,commitAsync
是一种非阻塞方法。调用它不会阻塞你的线程。相反,它会继续处理后面的指令,不管最终是成功还是失败。
例如,与前面的例子类似,但是这里我们使用commitAsync
:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
对于 for 循环中的每次迭代,无论 consumer.commitAsync()
最终会发生什么,您的代码都将移至下一次迭代。并且,提交的结果将由您定义的回调函数处理。
权衡:延迟与数据一致性
commitSync()
,因为它会确保在执行任何进一步操作之前,您会知道偏移量提交是成功还是失败。但是因为同步和阻塞,你会花更多的时间等待提交完成,从而导致高延迟。commitAsync()
,因为它不会等待完成。相反,它只会发出提交请求并稍后处理来自 Kafka 的响应(成功或失败),同时,您的代码将继续执行。 这都是一般来说,实际行为将取决于您的实际代码和调用方法的位置。
关于java - 卡夫卡消费者。 commitSync 与 commitAsync,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46546174/
kafka将如何处理对的调用 KafkaConsumer.commitAsync(Map offsets, OffsetCommitCallback callback) 当主题的偏移值被指定为小于先前
引自 https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callo
我想弄清楚 CommitAsync nhibernate 中的方法与 SaveAsync 一起工作, UpdateAsync , & DeleteAsync我的问题是:假设我调用 SaveAsync
我正在尝试用 java 编写一个 kafka 消费者应用程序Springboot 平台。早些时候,我用纯 Java 编写了代码,但是现在转换为 spring-kafka,因为它可以提供一些优势普通的j
我们在使用 ITransaction.CommitAsync 时遇到了一些奇怪的行为。有时,对 CommitAsync 的调用需要 24 小时才能完成。 在我们的场景中,我们每 5 分钟从硬件设备读取
我是一名优秀的程序员,十分优秀!