gpt4 book ai didi

node.js - 卡夫卡 - 滞后

转载 作者:太空宇宙 更新时间:2023-11-03 22:50:14 28 4
gpt4 key购买 nike

我正在使用“node-rdkafka”npm 模块来构建用 Nodejs 编写的分布式服务架构。我们有一个计量用例,其中我们只允许每 n 秒消耗和处理一定数量的消息。例如,“主”主题有 100 条由生产者推送的消息,“工作人员”每 30 秒从主主题消费一次。该用例的故事还有很多内容。

我遇到的问题是我需要以编程方式获取给定主题(所有分区)的滞后。

我有办法做到这一点吗?

我知道我可以使用“bin/kafka-consumer-groups.sh”来访问我需要的一些数据,但是还有其他方法吗?

提前谢谢

最佳答案

您可以通过多种方法直接从 node-rdkafka 客户端检索该信息:

  • 客户指标:

    客户端可以按定义的时间间隔发出指标,其中包含当前和已提交的偏移量以及最终偏移量,以便您可以轻松计算滞后。

    您首先需要通过在客户端配置中设置例如 'statistics.interval.ms': 5000 来启用指标事件。然后在 event.stats 事件上设置监听器:

    consumer.on('event.stats', function(stats) {
    console.log(stats);
    });

    完整的统计数据记录在https://github.com/edenhill/librdkafka/wiki/Statistics上但您可能最感兴趣的是分区统计信息:https://github.com/edenhill/librdkafka/wiki/Statistics#partitions

  • 查询集群的偏移量:

    您可以使用queryWatermarkOffsets() 检索分区的第一个和最后一个偏移量。

    consumer.queryWatermarkOffsets(topicName, partition, timeout, function(err, offsets) {
    var high = offsets.highOffset;
    var low = offsets.lowOffset;
    });

    然后使用消费者的当前位置 (position()) 或已提交 (commissed()) 偏移量来计算滞后。

关于node.js - 卡夫卡 - 滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49727191/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com