gpt4 book ai didi

java - 无法让 KafkaProducer/KafkaConsumer 在 Scala 中工作

转载 作者:行者123 更新时间:2023-11-30 07:05:16 26 4
gpt4 key购买 nike

我正在尝试创建一个简单的 KafkaProducer 和 KafkaConsumer,以便我可以将数据发送到代理上的主题,然后验证是否已收到数据。下面是我用来定义消费者和生产者的两种方法,以及如何发送消息。 send 方法至少需要 20 秒才能完成,据我所知,consumer.poll 方法实际上从未完成,但我最长的时间是 10 分钟。

有人对我做错了什么有建议吗?生产者/消费者是否有一些我没有正确设置的属性?这些属性是直接从文档复制的,所以我不明白为什么它们不起作用。

KafkaProducer docs
KafkaConsumer docs

"verify we can send to producer" in {
val consumer = createKafkaConsumer("address:9002")
val producer = createKafkaProducer("address:9002")

val message = "I am a message"
val record = new ProducerRecord[String, String]("myTopic", message)

producer.send(record)
TimeUnit.SECONDS.sleep(5)
val records = consumer.poll(5000)
println("records: "+records)
consumer1.close()
}

def createKafkaProducer(kafka: String): KafkaProducer[String,String] = {
val props = new Properties()
props.put("bootstrap.servers", kafka)
props.put("acks", "all")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

new KafkaProducer[String,String](props)
}

def createKafkaConsumer(kafka: String): KafkaConsumer[String, String] = {
val props = new Properties()
props.put("bootstrap.servers", kafka)
props.put("group.id", "test")
props.put("enable.auto.commit", "true")
props.put("auto.commit.interval.ms", "1000")
props.put("session.timeout.ms", "30000")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(Collections.singletonList("myTopic"))
consumer
}

编辑:我已经更新了我的代码,以便现在从发送方法获得响应,并且似乎超时了 org.apache.kafka.common.errors.TimeoutException: 无法更新元数据60000 毫秒后。

最佳答案

事实证明,我遇到了 DNS 问题,这意味着我实际上并未连接到代理。修复此问题后,消息即可通过,配置没有任何问题。

关于java - 无法让 KafkaProducer/KafkaConsumer 在 Scala 中工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40251976/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com