gpt4 book ai didi

java - 如何监控 Kafka 消息的应用程序处理以进行负载测试

转载 作者:塔克拉玛干 更新时间:2023-11-01 22:43:14 26 4
gpt4 key购买 nike

有一个应用程序(不是我的)从 Kafka 读取消息,对其进行一些处理,并将记录存储在数据库中。我用 Java 编写了一个程序,它以给定的速率将消息写入队列。现在,它通过在测试运行结束时查询数据库来进行简单的性能测量,以确保记录输入 = 记录输出。但是,我想扩展它以定期检查队列以查看应用程序尚未处理的待处理消息数,以查看它是否得到备份。

我想我可以在 Zookeeper 中检查应用程序组 ID 的偏移量。我看着 Kafka documentation ,但它只提供了基本的消费者示例,而且 API 文档充其量也很少,所以我不确定如何找到这些信息。

我需要调用哪些 API 才能找出应用程序当前在队列中的位置,以及该位置后面的队列中有多少消息?

我将 Kafka 2.10-0.8.2.1 与单个 Zookeeper 实例和三个 Kafka 实例一起使用,负载测试器使用 0.8.2.1 Java API。所讨论的主题有三个分区(每个 Kafka 服务器一个),但是为了测试的目的,只有一个消费者。

最佳答案

我建议查看 Kafka 中已经提供的工具(如果需要直接调用 API,代码在 src 中可用)。特别是,

$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1

将显示偏移和滞后:

consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)

引用资料:

关于java - 如何监控 Kafka 消息的应用程序处理以进行负载测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31588389/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com