gpt4 book ai didi

java - Kafka 从相同的偏移量重启

转载 作者:行者123 更新时间:2023-11-30 10:28:20 25 4
gpt4 key购买 nike

我有一个 kafka 消费者,它连接到一个具有 3 个分区的主题。一旦我从 kafka 获得记录,我想捕获偏移量和分区。重新启动时,我想从上次读取偏移量恢复消费者的位置

来自kafka文档:

每条记录都有自己的偏移量,因此要管理您自己的偏移量,您只需执行以下操作:

Configure enable.auto.commit=false

Use the offset provided with each ConsumerRecord to save your position.

On restart restore the position of the consumer using seek (TopicPartition, long).

这是我的示例代码:

constructor{    
load data into offsetMap<partition,offset>
initFlag=true;
}

Main method
{
ConsumerRecords<String, String> records = consumer.poll(100);
if(initFlag) // is this correct way to override offset position?
{
seekToPositions(offsetMap);
initFlag=false;
}
while(!shutdown)
{
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
getOffsetPositions();// dump offsets and partitions to db/disk
}
}
}

//get current offset and write to a file
public synchronized Map<Integer, Long> getOffsetPositions() throws Exception{

Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
//code to put partition and offset into map
//write to disk or db

}
} // Overrides the fetch offsets that the consumer

public synchronized void seekToPositions(Map<Integer, Long> offsetMap) {
//code get partitions and offset from offsetMap
consumer.seek(partition, offset);

}

这是正确的做法吗?有没有更好的办法?

最佳答案

如果您提交偏移量,Kafka 将为您存储它们(默认情况下最多存储 24 小时)。

这样,如果您的消费者死亡,您可以在另一台机器上启动相同的代码,然后从您中断的地方继续。无需外部存储。

参见 https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html 中的“抵消和消费者地位”

并推荐你考虑使用commitSync

关于java - Kafka 从相同的偏移量重启,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44700707/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com