gpt4 book ai didi

java - java中的Kafka消费者不消费消息

转载 作者:行者123 更新时间:2023-11-29 05:07:13 30 4
gpt4 key购买 nike

我正在尝试让 kafka 消费者获取生成并发布到 Java 主题的消息。我的消费者如下。

消费者.java

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;



public class KafkaConsumer extends Thread {
final static String clientId = "SimpleConsumerDemoClient";
final static String TOPIC = " AATest";
ConsumerConnector consumerConnector;


public static void main(String[] argv) throws UnsupportedEncodingException {
KafkaConsumer KafkaConsumer = new KafkaConsumer();
KafkaConsumer.start();
}

public KafkaConsumer(){
Properties properties = new Properties();
properties.put("zookeeper.connect","10.200.208.59:2181");
properties.put("group.id","test-group");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
}

@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0);
System.out.println(stream);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
System.out.println("from it");
System.out.println(new String(it.next().message()));

}

private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for(MessageAndOffset messageAndOffset: messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
}
}
}

当我运行上面的代码时,我在控制台中什么也得不到,屏幕后面的 java 生产者程序在“AATest”主题下连续发布数据。另外,在 zookeeper 控制台中,当我尝试运行上面的 consumer.java 时,我得到以下几行

[2015-04-30 15:57:31,284] INFO Accepted socket connection from /10.200.208.59:51780 (org.apache.zookeeper.
server.NIOServerCnxnFactory)
[2015-04-30 15:57:31,284] INFO Client attempting to establish new session at /10.200.208.59:51780 (org.apa
che.zookeeper.server.ZooKeeperServer)
[2015-04-30 15:57:31,315] INFO Established session 0x14d09cebce30007 with negotiated timeout 6000 for clie
nt /10.200.208.59:51780 (org.apache.zookeeper.server.ZooKeeperServer)

此外,当我运行一个指向 AATest 主题的单独控制台消费者时,我正在获取生产者为该主题生成的所有数据。

消费者和经纪人都在同一台机器上,而生产者在不同的机器上。这实际上类似于 this question .但是经历它对我有帮助。请帮助我。

最佳答案

不同的答案,但在我的例子中,它恰好是消费者的初始偏移量(auto.offset.reset)。因此,设置 auto.offset.reset=earliest 解决了我的场景中的问题。这是因为我是先发布事件,然后启动消费者。

默认情况下,消费者仅消费启动后发布的事件,因为默认情况下 auto.offset.reset=latest

例如。 consumer.properties

bootstrap.servers=localhost:9092
enable.auto.commit=true
auto.commit.interval.ms=1000
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

测试

class KafkaEventConsumerSpecs extends FunSuite {

case class TestEvent(eventOffset: Long, hashValue: Long, created: Date, testField: String) extends BaseEvent

test("given an event in the event-store, consumes an event") {

EmbeddedKafka.start()

//PRODUCE
val event = TestEvent(0l, 0l, new Date(), "data")
val config = new Properties() {
{
load(this.getClass.getResourceAsStream("/producer.properties"))
}
}
val producer = new KafkaProducer[String, String](config)

val persistedEvent = producer.send(new ProducerRecord(event.getClass.getSimpleName, event.toString))

assert(persistedEvent.get().offset() == 0)
assert(persistedEvent.get().checksum() != 0)

//CONSUME
val consumerConfig = new Properties() {
{
load(this.getClass.getResourceAsStream("/consumer.properties"))
put("group.id", "consumers_testEventsGroup")
put("client.id", "testEventConsumer")
}
}

assert(consumerConfig.getProperty("group.id") == "consumers_testEventsGroup")

val kafkaConsumer = new KafkaConsumer[String, String](consumerConfig)

assert(kafkaConsumer.listTopics().asScala.map(_._1).toList == List("TestEvent"))

kafkaConsumer.subscribe(Collections.singletonList("TestEvent"))

val events = kafkaConsumer.poll(1000)
assert(events.count() == 1)

EmbeddedKafka.stop()
}
}

但是如果消费者先启动然后发布,消费者应该能够消费事件而无需将 auto.offset.reset 设置为 earliest

kafka 0.10引用资料

https://kafka.apache.org/documentation/#consumerconfigs

关于java - java中的Kafka消费者不消费消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29965678/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com