gpt4 book ai didi

go - Kafka 消费组丢失未提交的消息

转载 作者:IT王子 更新时间:2023-10-29 02:10:06 24 4
gpt4 key购买 nike

我使用的消费者组只有一个消费者,只有一个经纪人(docker wurstmeister image)。在代码中决定是否提交偏移量——如果代码返回错误,则消息不会提交。我需要确保系统不会丢失任何消息——即使这意味着永远重试相同的消息(现在 ;))。为了对此进行测试,我创建了一个简单的处理程序,它不会在“错误”字符串作为消息发送给 kafka 的情况下提交偏移量。所有其他字符串均已提交。

kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited

正在运行

kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe

返回

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test 0 13 13 0

所以没关系,没有延迟。现在我们传递 'error' 字符串来假装发生了不好的事情并且消息没有提交:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test 0 13 14 1

当前偏移量保持在正确位置 + 有 1 条滞后消息。现在,如果我们再次传递正确的消息,偏移量将移动到 15:

TOPIC 分区 CURRENT-OFFSET LOG-END-OFFSET LAG
测试 0 15 15

第 14 条消息将不会再被接收。这是默认行为吗?我是否需要通过 it+1 手动跟踪最后的偏移量和加载消息?我已将提交间隔设置为 0,希望不使用任何 auto.commit 机制。

获取/提交代码:

go func() {
for {
ctx := context.Background()

m, err := mr.brokerReader.FetchMessage(ctx)
if err != nil {
break
}

if err := msgFunc(m); err != nil {
log.Errorf("# messaging # cannot commit a message: %v", err)
continue
}

// commit message if no error
if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
// should we do something else to just logging not committed message?
log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
}
}
}()

阅读器配置:

kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
CommitInterval: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})

使用的库:https://github.com/segmentio/kafka-go

最佳答案

在 kafka 中,您只需提交偏移量而不是单个消息。如果我理解你的代码(不是 go-developer)。您只需在遇到无效消息后继续。如果在无效消息之后出现有效消息,您将再次提交偏移量 - 我想这不是您的意图。

只是为了弄清楚提交或提交偏移量意味着什么:您的消费者组会将偏移量存储到专用的内部 kafka 主题(或在 zookeeper 上的旧版 kafka 上)。偏移量可以标识主题内的单个位置(或者更准确地说,是在给定主题的分区上)。这意味着您只能以线性方式使用主题。

在这里你可以看到kafka-consumer端发生了什么:

New Kafka Consumer

您正在使用(很可能是多个)消息栈。您提交此主题/分区的位置(也称为偏移量)。所以你可以说我想再次重新使用特定消息。您可以做的是一旦遇到无效消息就停止消费。在这种情况下,您的问题将是:如何删除此消息。从 kafka 主题中删除一条消息是很棘手的。一种常见的模式是将此消息写入某种死信主题并由不同的消费者处理。

希望这能让您更清楚一些。

关于go - Kafka 消费组丢失未提交的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49360325/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com