gpt4 book ai didi

java - 可以向Kafka生产但不能消费

转载 作者:行者123 更新时间:2023-12-02 10:56:38 24 4
gpt4 key购买 nike

我使用的是 Kafka JDK 客户端版本 0.10.2.1 。我能够向 Kafka 生成简单的消息以进行“心跳”测试,但我无法使用 sdk 使用来自同一主题的消息。当我进入 Kafka CLI 时,我能够使用该消息,因此我已经确认该消息存在。这是我用来从我的 Kafka 服务器消费的函数,带有 props - 只有在我确实确认 product() 成功之后,我才将生成的消息传递给主题,我可以发布如果需要,稍后可以使用该功能:

private def consumeFromKafka(topic: String, expectedMessage: String): Boolean = {
val props: Properties = initProps("consumer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJava)
var readExpectedRecord = false
try {
val records = {
val firstPollRecs = consumer.poll(MAX_POLLTIME_MS)
// increase timeout and try again if nothing comes back the first time in case system is busy
if (firstPollRecs.count() == 0) firstPollRecs else {
logger.info("KafkaHeartBeat: First poll had 0 records- trying again - doubling timeout to "
+ (MAX_POLLTIME_MS * 2)/1000 + " sec.")
consumer.poll(MAX_POLLTIME_MS * 2)
}
}
records.forEach(rec => {
if (rec.value() == expectedMessage) readExpectedRecord = true
})
} catch {
case e: Throwable => //log error
} finally {
consumer.close()
}
readExpectedRecord
}

private def initProps(propsType: String): Properties = {
val prop = new Properties()
prop.put("bootstrap.servers", kafkaServer + ":" + kafkaPort)

propsType match {
case "producer" => {
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
prop.put("acks", "1")
prop.put("producer.type", "sync")
prop.put("retries", "3")
prop.put("linger.ms", "5")
}
case "consumer" => {
prop.put("group.id", groupId)
prop.put("enable.auto.commit", "false")
prop.put("auto.commit.interval.ms", "1000")
prop.put("session.timeout.ms", "30000")
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// poll just once, should only be one record for the heartbeat
prop.put("max.poll.records", "1")
}
}
prop
}

现在,当我运行代码时,它在控制台中输出如下:

13:04:21 - Discovered coordinator serverName:9092 (id: 2147483647

rack: null) for group 0b8947e1-eb68-4af3-ac7b-be3f7c02e76e. 13:04:23

INFO o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned

partitions [] for group 0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:24

INFO o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group

0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:25 INFO

o.a.k.c.c.i.AbstractCoordinator - Successfully joined group

0b8947e1-eb68-4af3-ac7b-be3f7c02e76e with generation 1 13:04:26 INFO

o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions

[HeartBeat_Topic.Service_5.2018-08-03.13_04_10.377-0] for group

0b8947e1-eb68-4af3-ac7b-be3f7c02e76e 13:04:27 INFO

c.p.p.l.util.KafkaHeartBeatUtil - KafkaHeartBeat: First poll had 0

records- trying again - doubling timeout to 60 sec.

然后没有其他任何事情,没有抛出任何错误 - 所以没有记录被轮询。有谁知道是什么阻止了“消费”的发生?订阅者似乎成功了,因为我能够成功调用 listTopics 并列出分区,没有问题。

最佳答案

您的代码有一个错误。看来你的台词是:

 if (firstPollRecs.count() == 0) 

应该这样说

 if (firstPollRecs.count() > 0) 

否则,您将传入一个空的 firstPollRecs,然后对其进行迭代,这显然不会返回任何内容。

关于java - 可以向Kafka生产但不能消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51678093/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com