- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我是 kafka 的新手。我阅读了文档以开始使用,现在我正在尝试使用嵌入式 kafka 模式。我尝试了一个相同的示例程序。
public static void main(String args[]) throws InterruptedException, IOException {
// setup Zookeeper
EmbeddedZookeeper zkServer = new EmbeddedZookeeper();
String zkConnect = ZKHOST + ":" + zkServer.port();
ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = ZkUtils.apply(zkClient, false);
// setup Broker
Properties brokerProps = new Properties();
brokerProps.setProperty("zookeeper.connect", zkConnect);
brokerProps.setProperty("broker.id", "0");
brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT);
KafkaConfig config = new KafkaConfig(brokerProps);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
// create topic
AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
// setup producer
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
List<PartitionInfo> partitionInfo = producer.partitionsFor("test");
System.out.println(partitionInfo);
// setup consumer
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
consumerProps.setProperty("group.id", "group0");
consumerProps.setProperty("client.id", "consumer0");
consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic
KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Arrays.asList(TOPIC));
// send message
ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8));
producer.send(data);
producer.close();
// starting consumer
ConsumerRecords<Integer, byte[]> records = consumer.poll(1000);
Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
ConsumerRecord<Integer, byte[]> record = recordIterator.next();
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}
但我无法为主题获取数据。我在执行程序时遇到以下异常
java.util.NoSuchElementException
at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
at com.nuwaza.evlauation.embedded.kafka.EmbeddedKafka.main(EmbeddedKafka.java:105)
谁能指导我?
更新-
WARN [main] (Logging.scala#warn:83) - No meta.properties file under dir C:\Users\bhavanak\AppData\Local\Temp\kafka-1238324273778000675\meta.properties
WARN [main] (Logging.scala#warn:83) - No meta.properties file under dir C:\Users\bhavanak\AppData\Local\Temp\kafka-1238324273778000675\meta.properties
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 0 : {test=LEADER_NOT_AVAILABLE}
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE}
WARN [kafka-producer-network-thread | producer-1] (NetworkClient.java#handleResponse:600) - Error while fetching metadata with correlation id 2 : {test=LEADER_NOT_AVAILABLE}
[Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = [0,]]
ERROR [main] (NIOServerCnxnFactory.java#uncaughtException:44) - Thread Thread[main,5,main] died
java.util.NoSuchElementException
at org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
at com.nuwaza.evlauation.embedded.kafka.EmbeddedKafka.main(EmbeddedKafka.java:105)
最佳答案
尝试在读取消息之前调用 producer.flush() 以确保生成的消息确实保存在磁盘上。
关于java - NoSuchElementException卡夫卡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40378394/
我使用 streamsBuilder.table("myTopic") 为某个主题创建了一个 Ktable,并将其具体化为状态存储,以便我可以使用交互式查询. 每小时,我都想从该状态存储(以及关联的变
我正在kafka中进行数据复制。但是,kafka 日志文件的大小增长得非常快。一天大小达到 5 GB。作为这个问题的解决方案,我想立即删除已处理的数据。我正在 AdminClient 中使用删除记录方
我实际上正在使用 SASL 纯文本设置简单的 Kafka 身份验证并添加 ACL 授权。但是当我尝试使用数据时遇到问题。 [main] INFO org.apache.kafka.common.uti
我正在一个使用 Kafka 和 Akka Streams 的项目 reactive-kafka连接器。我们发现reactive-kafka使用它自己的调度程序(akka.kafka.default-d
我试图在HDP上运行简单的kafka生产者消费者示例,但面临以下异常。 [2016-03-03 18:26:38,683] WARN Fetching topic metadata with corr
我继承了一些正在实现到另一个项目中的 Kafka 代码,并遇到了一个问题...消费者收到来自生产者的 3995 条消息后,它崩溃并给出以下错误: ERROR Error while accepting
我正在尝试测试 Flink 程序以使用此 JSONKeyValueDeserializationSchema 类读取来自 Kafka 的 JSON 数据。但是我的 Intellij 没有找到这个类。我
我有一个简单的生产者-消费者设置:1 个生产者(作为一个线程)和 2 个消费者(作为 2 个进程)。生产者的run方法: def run(self): producer = K
我正在使用“node-rdkafka”npm 模块来构建用 Nodejs 编写的分布式服务架构。我们有一个计量用例,其中我们只允许每 n 秒消耗和处理一定数量的消息。例如,“主”主题有 100 条由生
我正在学习 Kafka,我想知道当我消费来自主题的消息时如何指定然后分区。 我找到了几张这样的图片: 这意味着一个消费者可以消费来自多个分区的消息,但一个分区只能由单个消费者(在消费者组内)读取。 此
我想从flink读取一个kafka主题 package Toletum.pruebas; import org.apache.flink.api.common.functions.MapFunctio
我阅读了 Kafka 网站上的文档,但是在尝试实现一个完整的最小示例(生产者 --> kafka --> 消费者)之后,我不太清楚“消费者状态”如何处理偏移量。 一些信息 我正在使用高级 API (J
刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0),但是当我添加事务时,处理速度降低了很多。 代码: spring.kafka.consumer.max-pol
我尝试在安全模式下使用kafka(0.9.1)。我会使用 Spark 读取数据,因此我必须将 JAAS conf 文件传递给 JVM。我使用这个 cmd 来开始我的工作: /opt/spa
目标:读取主题中的所有消息,然后终止进程。 我能够连续阅读以下消息: props.put("bootstrap.servers", kafkaBootstrapSrv); props.put("gro
我写了一个非常简单的 Flink 流作业,它使用 FlinkKafkaConsumer082 从 Kafka 获取数据。 protected DataStream getKafkaStream(Str
我使用的是kafka 2.10-0.9.0.1当我通过命令删除主题时,主题被标记为删除。 bin/kafka-topics.sh --zookeeper localhost:2181 --delete
当我实例化一个 Kafka 消费者时 KafkaConsumer consumer = new KafkaConsumer(props); 我收到这条消息 SLF4J: Failed to load
我有一个用例,我需要 100% 的可靠性、幂等性(无重复消息)以及我的 Kafka 分区中的顺序保留。我正在尝试使用事务 API 来设置概念证明来实现这一点。有一个名为“isolation.level
我们有一个 3 主机的 Kafka 集群。我们有 136 个主题,每个主题有 100 个分区,复制因子为 3。这使得我们的集群中有 13,600 个分区。 这是我们主题的合理配置吗? 最佳答案 太多了
我是一名优秀的程序员,十分优秀!