gpt4 book ai didi

scala - Kafka 消费者不返回任何事件

转载 作者:行者123 更新时间:2023-12-02 20:09:41 24 4
gpt4 key购买 nike

下面的 Scala kafka 消费者没有从 poll 调用中返回任何事件。

但是,主题是正确的,我可以使用控制台消费者看到事件被发送到主题:

/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning

当我使用调试器单步执行它并调用 kafkaConsumer.listTopics()

时,我还在下面的 Scala 代码示例中看到了该主题

另外,这是从单个单元测试中调用的,所以我只创建了这个特征和消费者的一个实例(即另一个消费者实例不能消费消息)。我还使用了一个随机的 group_id。

下面的代码/配置有什么问题吗?

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.util.Random

trait KafkaTest {

val kafkaConsumerProperties = new Properties()

kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")

kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)

kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])

kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])

val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)

kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))

def checkKafkaHasReceivedEvent(): Assertion = {

val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
...
}
}

增加轮询超时也无济于事。

最佳答案

要从头开始读取 AUTO_OFFSET_RESET_CONFIG 属性必须设置为最早,默认情况下为“最新”

kafkaConsumerProperties.put(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
OffsetResetStrategy.EARLIEST.toString().toLowerCase())

关于scala - Kafka 消费者不返回任何事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53867775/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com