gpt4 book ai didi

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:16:10 25 4
gpt4 key购买 nike

新的 Kafka 版本 (0.11) 支持 exactly-once 语义。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

我在 Java 中使用 kafka 事务代码设置了一个生产者,就像这样。

producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}

Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}

我不太确定如何使用 sendOffsetsToTransaction 及其预期用例。 AFAIK,消费者群体是消费者端的多线程读取功能。

javadoc 说

”向消费者组协调器发送一个已消费的偏移量列表,同时将这些偏移量标记为当前事务的一部分。只有在事务提交成功时,这些偏移量才会被视为已消耗。当您需要时应该使用此方法将消费和生产消息一起批量处理,通常采用消费-转换-生产模式。”

produce 将如何维护消耗的抵消列表?这有什么意义呢?

最佳答案

这仅与您消费然后根据消费内容生成消息的工作流程相关。此函数允许您仅在下游生产成功时提交您消耗的偏移量。如果您消费数据,以某种方式处理它,然后产生结果,这将实现跨消费/生产的交易保证。

如果没有事务,您通常使用 Consumer#commitSync()Consumer#commitAsync() 来提交消费者偏移量。但是,如果您在与您的生产者进行生产之前使用这些方法,您将在知道生产者是否成功发送之前已经提交了偏移量。

因此,您可以在生产者上使用 Producer#sendOffsetsToTransaction() 来提交偏移量,而不是向消费者提交偏移量。这会将偏移量发送到处理事务的事务管理器。仅当整个交易(消费和生产)成功时,它才会提交抵消。

(注意:当您发送偏移量进行提交时,您应该将上次读取的偏移量加 1,以便以后的读取从您尚未读取的偏移量恢复。无论无论您是向消费者还是向生产者 promise 。请参阅:KafkaProducer sendOffsetsToTransaction need offset+1 to successfully commit current offset)。

关于java - Kafka 0.11 中 sendOffsetsToTransaction 的含义,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45195010/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com