gpt4 book ai didi

java - 为什么我的 Kafka Consumer 在第一次运行时消费消息很快,但在以后的运行中速度会大大降低?

转载 作者:搜寻专家 更新时间:2023-11-01 03:36:10 25 4
gpt4 key购买 nike

我是一名研究和使用 Kafka 的学生。在遵循 Apache 文档中的示例之后,我正在使用他们当前 Github 存储库主干中的示例部分。

截至目前,该示例实现了其 Consumer 的“旧”版本,并且未使用新的 KafkaConsumer。根据文档,我编写了自己的 KafkaConsumer 版本,认为它会更快。

这是一个模糊的问题,但在整个过程中,我生成了 5000 条简单的消息,例如“Message_CurrentMessageNumber”到主题“test”,然后使用我的消费者获取这些消息并将它们打印到 stdout。当我运行示例代码将提供的消费者替换为较新的 KafkaConsumer(v 0.8.2 及更高版本)时,它的运行速度非常快,与第一次运行时的示例相当,但之后的任何时候都会显着减慢.

我注意到我的 Kafka Server 输出

Rebalancing group group1 generation 3 (kafka.coordinator.ConsumerCoordinator)

或类似的消息经常让我相信 Kafka 必须做某种负载平衡来减慢速度,但我想知道是否有其他人知道我做错了什么。

public class AlternateConsumer extends Thread {

private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;

public AlternateConsumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "newestGroup");
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}

public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(100);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}

// ConsumerRecords<Integer, String> records = consumer.poll(0);
// for (ConsumerRecord<Integer, String> record : records) {
// System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
// }
// consumer.close();
}
}

开始:

package kafka.examples;

public class KafkaConsumerProducerDemo implements KafkaProperties
{
public static void main(String[] args) {
final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;

Producer producerThread = new Producer("test", isAsync);
producerThread.start();

AlternateConsumer consumerThread = new AlternateConsumer("test");
consumerThread.start();
}
}

生产者是位于此处的默认生产者:https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java

最佳答案

这不应该是这样的。如果您的两个消费者之间的设置相似,您应该期望新消费者获得更好的结果,除非客户端/消费者实现中存在问题,这似乎就是这种情况。

您能否分享您的基准测试结果和报告的重新平衡频率和/或您观察到的任何模式(即启动时缓慢、固定消息消耗后、队列耗尽后等)。另外,如果您可以分享有关您的消费者实现的一些细节。

关于java - 为什么我的 Kafka Consumer 在第一次运行时消费消息很快,但在以后的运行中速度会大大降低?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30855538/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com