gpt4 book ai didi

java - NoSuchElementException卡夫卡

转载 作者:搜寻专家 更新时间:2023-11-01 03:33:28 25 4
gpt4 key购买 nike

我是 kafka 的新手。我阅读了文档以开始使用,现在我正在尝试使用嵌入式 kafka 模式。我尝试了一个相同的示例程序。

 public static void main(String args[]) throws InterruptedException, IOException {

// setup Zookeeper
EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + zkServer.port();
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);

// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT);
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);

// create topic
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);

// setup producer
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
List<PartitionInfo> partitionInfo = producer.partitionsFor("test");
System.out.println(partitionInfo);
// setup consumer
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TOPIC));

// send message
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8));
producer.send(data);
producer.close();

// starting consumer
ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);

Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
ConsumerRecord<Integer, byte[]> record = recordIterator.next();
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());


kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}

}

但我无法为主题获取数据。我在执行程序时遇到以下异常

java.util.NoSuchElementException
at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
at com.nuwaza.evlauation.embedded.kafka.EmbeddedKafka.main(EmbeddedKafka.java:105)

谁能指导我?

更新-

 WARN [main] (Logging.scala#warn:83) - No meta.properties file under dir    C:\Users\bhavanak\AppData\Local\Temp\kafka-1238324273778000675\meta.properties
WARN [main] (Logging.scala#warn:83) - No meta.properties file under dir C:\Users\bhavanak\AppData\Local\Temp\kafka-1238324273778000675\meta.properties
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE}
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE}
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE}
[Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = [0,]]
ERROR [main] (NIOServerCnxnFactory.java#uncaughtException:44) - Thread Thread[main,5,main] died
java.util.NoSuchElementException
at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
at com.nuwaza.evlauation.embedded.kafka.EmbeddedKafka.main(EmbeddedKafka.java:105)

最佳答案

尝试在读取消息之前调用 producer.flush() 以确保生成的消息确实保存在磁盘上。

关于java - NoSuchElementException卡夫卡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40378394/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com