gpt4 book ai didi

java - Kafka Java 消费者 API 问题

转载 作者:行者123 更新时间:2023-11-30 08:42:41 25 4
gpt4 key购买 nike

我正在尝试使用 Kafka java API 来消费消息。我能够使用 kafka-console-consumer.bat 消费消息。但是,无法使用来自 java api 的消息。没有收到任何错误或任何消息。帮我看看我犯了什么错误。

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SimpleConsumer {
private final ConsumerConnector consumer;
private final String topic;

public SimpleConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");

consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}

public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}

public static void main(String[] args) {
String topic = "test";
SimpleConsumer simpleHLConsumer = new SimpleConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}

控制台命令

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning

创建主题:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test

发布消息使用:

kafka-console-producer.bat --broker-list localhost:9092 --topic test

如果我运行消费者程序,在 Broker 控制台中登录:

[2015-12-29 11:57:34,448] INFO Closing socket connection to /IP (kafka.network.Processor).

如果我关闭获取此日志的程序:

java.io.IOException: An existing connection was forcibly closed by the remote host

请帮助我为什么我无法使用来自上述程序的消息。

但是,可以使用

消费消息
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning 

帮帮我。

最佳答案

如果你想从头开始阅读消息你需要设置选项

auto.offset.reset=smallest

默认情况下它是“最大的”。

http://kafka.apache.org/documentation.html

What to do when there is no initial offset in ZooKeeper or if an offset is out of range:

  • smallest : automatically reset the offset to the smallest offset
  • largest : automatically reset the offset to the largest offset
  • anything else: throw exception to the consumer

注意:此选项适用于新的消费者 API(自 0.9.0.0 起):

auto.offset.reset=earliest|latest|none

偏移量已保存在 zookeeper 中,用于您的 group.id。所以如果你想看到消息改变group.id或者清理zookeeper

关于java - Kafka Java 消费者 API 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34486625/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com