gpt4 book ai didi

scala - Kafka scala Consumer代码——打印消费记录

转载 作者:行者123 更新时间:2023-12-05 09:18:47 26 4
gpt4 key购买 nike

因为我正在使用 url 创建如下简单的 kafka 消费者:https://gist.github.com/akhil/6dfda8a04e33eff91a20 .

在该链接中,为了打印消耗的记录,使用了一个未识别的词“asScala”。请告诉我如何迭代返回类型:ConsumerRecord[String,String],它是 poll() 方法的返回类型。

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}


object KafkaConsumerEx extends App {

val topic_name = "newtopic55"
val consumer_group = "KafkaConsumerBatch"

val prot = new Properties()
prot.put("bootstrap.servers","localhost:9092")
prot.put("group.id",consumer_group)
prot.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

val kfk_consumer = new KafkaConsumer[String,String](prot)
kfk_consumer.subscribe(util.Collections.singleton(topic_name))
println("here")

while(true){
val consumer_record : ConsumerRecords[String, String] = kfk_consumer.poll(100)
println("records count : " + consumer_record.count())
println("records partitions: " + consumer_record.partitions())
consumer_record.iterator().


}

}

感谢副词。

最佳答案

你可以轻松做到这一点

for (record <- consumer_record.iterator()) {
println(s"Here's your $record")
}

记得添加这个导入:

import scala.collection.JavaConversions._

关于scala - Kafka scala Consumer代码——打印消费记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43528941/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com