gpt4 book ai didi

apache-kafka - 如何在 Kafka Sink Connector 中手动提交偏移量

转载 作者:行者123 更新时间:2023-12-05 06:23:19 24 4
gpt4 key购买 nike

我有一个 Kafka Sink 任务,它正在通过以下方式收听 Kafka 主题put() 方法。
但是我不想自动提交偏移量,因为一旦从 Kafka 获取记录,我就有一些处理逻辑。
从 Kafka 获取记录后,如果处理成功,那么我只想提交偏移量,否则它应该再次从相同的偏移量读取。

我可以看到 Kafka 消费者中有方法 commitSync() 但在 Sink Connector 中找不到相同的方法。

最佳答案

Sink Kafka Connector-Commit

如果选项(enable.auto.commit) 为False,则根据下面的选项(offset.flush.interval.ms) 每60 秒自动提交一次。如果您的 put() 方法没有错误,它将正常提交。

offset.flush.interval.ms
Interval at which to try committing offsets for tasks.

Type: long
Default: 60000
Importance: low

在 Sink Kafka 中管理偏移量

Kafka Connect 应该提交它通过 preCommit 传递给连接器的所有偏移量。但是,如果您的 preCommit 返回一组空的偏移量,则 Kafka Connect 将根本不记录任何偏移量。 enter link description here

SinkTask.java

/**
* Pre-commit hook invoked prior to an offset commit.
*
* The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable.
*
* @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
* provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
* passed to {@link #put}.
*
* @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition.
*/
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
flush(currentOffsets);
return currentOffsets;
}

SinkTaskContext.java

/**
* Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
* by requesting an offset commit as soon as they flush data to the destination system.
*
* This is a hint to the runtime and no timing guarantee should be assumed.
*/
void requestCommit();

关于apache-kafka - 如何在 Kafka Sink Connector 中手动提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58505063/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com