gpt4 book ai didi

apache-kafka - 是否可以将偏移量重置为 kafka 连接器中的 kafka 消费者组的主题?

转载 作者:行者123 更新时间:2023-12-04 01:44:23 34 4
gpt4 key购买 nike

我的 kafka sink 连接器从多个主题(配置了 10 个任务)读取,并处理来自所有主题的 300 条记录。根据每个记录中保存的信息,连接器可以执行某些操作。

以下是触发器记录中键值对的示例:
"REPROCESS":"my-topic-1"
阅读此记录后,我需要将主题“my-topic-1”的每个分区中的偏移量重置为 0。

我在很多地方都读到创建一个新的KafkaConsumer ,订阅主题的分区,然后调用 subscribe(...)方法是推荐的方法。例如,

public class MyTask extends SinkTask {

@Override
public void put(Collection<SinkRecord> records) {
records.forEach(record -> {
if (record.key().toString().equals("REPROCESS")) {
reprocessTopicRecords(record);
} else {
// do something else
}
});
}
private void reprocessTopicRecords(SinkRecord record) {
KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer =
new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do offset reset here
}
}
);
}
}

但是,上述策略不适用于我的情况,因为:
1. 这取决于发生的组重新平衡(并不总是发生)
2. 'partitions' 传递给 onPartitionsAssigned方法是动态分配的分区,这意味着这些只是需要重置偏移量的完整分区集的一个子集。例如,这个 SinkTask 将只分配到保存“my-topic-1”记录的 8 个分区中的 2 个。

我也考虑过使用 assign()但这与 SinkConnector/SinkTask 实现中的分布式消费者模型(消费者组)不兼容。

我知道 kafka 命令行工具 kafka-consumer-groups可以做我想做的事(我认为):
https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

总而言之,我想使用 Java API 重置给定主题的所有分区的偏移量,并让接收器连接器获取偏移量更改并继续执行它一直在做的事情(处理记录)。

提前致谢。

最佳答案

通过使用一系列 Confluent 的 kafka-rest-proxy API,我能够为 kafka 连接消费者组实现重置偏移量:https://docs.confluent.io/current/kafka-rest/api.html

此实现不再需要原始帖子中描述的“触发记录”方法,并且完全基于 Rest API。

  • 临时删除 kafka 连接器(这将删除连接器的使用者和 )
  • 为同一个消费者组(“connect-”)创建一个消费者实例
  • 让实例订阅您要重置的请求主题
  • 做一个虚拟民意调查(“订阅”被懒惰地评估)
  • 重置指定主题的消费者组主题偏移
  • 做一个虚拟轮询('seek'被懒惰地评估')为消费者提交当前的偏移状态(在代理中)
  • 重新创建 kafka 连接器(具有相同的连接器名称) - 重新平衡后,消费者将加入该组并读取最后提交的偏移量(从 0 开始)
  • 删除临时消费者实例

  • 如果您能够使用 CLI,步骤 2-6 可以替换为:
    kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute
    对于那些试图通过 native Java API 在 kafka 连接器代码中执行此操作的人,您很不走运:-(

    关于apache-kafka - 是否可以将偏移量重置为 kafka 连接器中的 kafka 消费者组的主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55934190/

    34 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com