- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我使用的消费者组只有一个消费者,只有一个经纪人(docker wurstmeister image)。在代码中决定是否提交偏移量——如果代码返回错误,则消息不会提交。我需要确保系统不会丢失任何消息——即使这意味着永远重试相同的消息(现在 ;))。为了对此进行测试,我创建了一个简单的处理程序,它不会在“错误”字符串作为消息发送给 kafka 的情况下提交偏移量。所有其他字符串均已提交。
kafka-console-producer --broker-list localhost:9092 --topic test
>this will be commited
正在运行
kafka-run-class kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group michalgrupa --describe
返回
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 13 13 0
所以没关系,没有延迟。现在我们传递 'error' 字符串来假装发生了不好的事情并且消息没有提交:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 13 14 1
当前偏移量保持在正确位置 + 有 1 条滞后消息。现在,如果我们再次传递正确的消息,偏移量将移动到 15:
TOPIC 分区 CURRENT-OFFSET LOG-END-OFFSET LAG
测试 0 15 15
第 14 条消息将不会再被接收。这是默认行为吗?我是否需要通过 it+1 手动跟踪最后的偏移量和加载消息?我已将提交间隔设置为 0,希望不使用任何 auto.commit 机制。
获取/提交代码:
go func() {
for {
ctx := context.Background()
m, err := mr.brokerReader.FetchMessage(ctx)
if err != nil {
break
}
if err := msgFunc(m); err != nil {
log.Errorf("# messaging # cannot commit a message: %v", err)
continue
}
// commit message if no error
if err := mr.brokerReader.CommitMessages(ctx, m); err != nil {
// should we do something else to just logging not committed message?
log.Errorf("cannot commit message [%s] %v/%v: %s = %s; with error: %v", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value), err)
}
}
}()
阅读器配置:
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
CommitInterval: 0,
MinBytes: 10e3,
MaxBytes: 10e6,
})
最佳答案
在 kafka 中,您只需提交偏移量而不是单个消息。如果我理解你的代码(不是 go-developer)。您只需在遇到无效消息后继续。如果在无效消息之后出现有效消息,您将再次提交偏移量 - 我想这不是您的意图。
只是为了弄清楚提交或提交偏移量意味着什么:您的消费者组会将偏移量存储到专用的内部 kafka 主题(或在 zookeeper 上的旧版 kafka 上)。偏移量可以标识主题内的单个位置(或者更准确地说,是在给定主题的分区上)。这意味着您只能以线性方式使用主题。
在这里你可以看到kafka-consumer端发生了什么:
您正在使用(很可能是多个)消息栈。您提交此主题/分区的位置(也称为偏移量)。所以你可以不说我想再次重新使用特定消息。您可以做的是一旦遇到无效消息就停止消费。在这种情况下,您的问题将是:如何删除此消息。从 kafka 主题中删除一条消息是很棘手的。一种常见的模式是将此消息写入某种死信主题并由不同的消费者处理。
希望这能让您更清楚一些。
关于go - Kafka 消费组丢失未提交的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49360325/
我想使用我编写的类模块的事件。类模块如下所示 ''CError64Row Public Event ErrorClicked(ByVal row As Integer, ByVal column As
我正在寻找实现智能架构的良好实践,以及处理针对具有许多不同 wdsl web 服务的系统的集成的方法。 我已经有 2 年的爱好使用 C# 进行开发了~,因此我并不总是使用正确的术语,但我会尝试描述我正
目前,我正在为我的程序使用 Azure Consumer API。但它非常慢,几乎需要8秒才能给出响应。我现在应该怎么做?这是我正在使用的 azure API.. https://management
我的流程是: AcitveMQ 控制台在主题部分下显示了一个使用者,但是一旦
我一直在阅读类似 Why does a function that accepts a Box complain of a value being moved when a function that
AMQP 函数 consume() 是一个带有回调的阻塞函数,是否可以为 consume() 函数设置超时,以便在特定时间后不再阻塞并且代码执行完成? 最佳答案 是的,方法如下: $amqp = ne
我有一个客户端/服务器应用程序,其中客户端以 JSON 形式将对象发送到运行 PHP 脚本的服务器,然后将此数据放入数据库。 问题是解码是用 json_decode 函数完成的,它似乎适用于字符串而不
所以我已经模拟了我的生产者消费者问题并且我有下面的代码。我的问题是:如果消费者一直处于 while(true) 状态,他如何停止。 在下面的代码中,我添加了 i
我无法使用在delphi 中开发的dll 的功能。我在类型转换方面遇到了一些困难。 这是我要调用 DLL 的函数: function rData(ID: Cardinal; queue: WideSt
我想使用 Unity3D 可视化 Kafka 流。在 Unity 中访问数据流的最佳方式是什么? 我已经用 Node 和 C# 编写了基本使用者,但我不确定如何将它们合并到 Unity 中。任何帮助表
如果标题太笼统,我很抱歉,但我已经浏览了一个小时的互联网,但找不到任何架构解释。我对 RSS 和 Atom 协议(protocol)都是全新的,据我到目前为止所了解的是: 服务器发布文档 客户端订阅此
我很喜欢我刚刚发现的 Guzzle 框架。我正在使用它使用不同的响应结构跨多个 API 聚合数据。它可以使用 JSON 和 XML 找到,但我需要使用的服务之一使用 SOAP。是否有使用 Guzzle
有没有一种方法可以像访问 Microsoft.Azure.Management.Fluent 一样访问 Azure.Management.Conclusion.Models? 当我执行以下代码时,我看
我有这个部分场景图树: CustomPane (with onMouseClicked Handler) → ChildNode (with onMousePressed Handler) 当我在
我的问题是这个 json。 http://dev-rexolution.pantheonsite.io/api/noticias 我只需要使用 vuejs 2 使用数组的第一个元素才能显示它,使用我工
我是 ML 新手,一直在研究 CNTK 教程。我已经成功训练了几个模型。 我完成了迁移学习教程 ( https://github.com/Microsoft/CNTK/blob/v2.1/Tutori
我是 RabbitMq 和 AMQP 的新手,但我对 ActiveMQ 和 JMS 有一些经验。我尝试在主题(JMS 中的主题之类的主题)中发布一条消息,并从多个监听器中使用此消息。比如我发布一条消息
我正在尝试让我的服务器解析以下 JSON: {"hardwareId":1,"registerTime":"2017-02-14T03:42:11.482Z","sensorId":1,"temper
我正在开发一个从外部 url 使用 json 的网站,我试过了但是我得到了一个错误 XMLHttpRequest 无法加载 http://reuniyo.com/tst/json.php。 Acces
我正在尝试使用Kafka Streams(即不是简单的Kafka Consumer)从重试主题中读取之前无法处理的事件。我希望从重试主题中进行消费,如果处理仍然失败(例如,如果外部系统已关闭),我希望
我是一名优秀的程序员,十分优秀!