gpt4 book ai didi

apache-spark - Kafka spark directStream 无法获取数据

转载 作者:行者123 更新时间:2023-12-04 04:41:51 25 4
gpt4 key购买 nike

我正在使用 spark directStream api 从 Kafka 读取数据。我的代码如下:

val sparkConf = new SparkConf().setAppName("testdirectStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))

val kafkaParams = Map[String, String](
"auto.offset.reset" -> "smallest",
"metadata.broker.list"->"10.0.0.11:9092",
"spark.streaming.kafka.maxRatePerPartition"->"100"
)
//I set all of the 3 partitions fromOffset are 0
var fromOffsets:Map[TopicAndPartition, Long] = Map(TopicAndPartition("mytopic",0) -> 0)
fromOffsets+=(TopicAndPartition("mytopic",1) -> 0)
fromOffsets+=(TopicAndPartition("mytopic",2) -> 0)

val kafkaData = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]](
ssc, kafkaParams, fromOffsets,(mmd: MessageAndMetadata[String, String]) => mmd)

var offsetRanges = Array[OffsetRange]()
kafkaData.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
_.message()
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"---${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
rdd.foreachPartition{ partitionOfRecords =>
partitionOfRecords.foreach { line =>
println("===============value:"+line)
}
}
}

我确定 kafka 集群中有数据,但是我的代码无法获取任何数据。提前致谢。

最佳答案

我找到了原因:kafka中的旧消息因为保留期到期已经被删除了。因此,当我将 fromOffset 设置为 0 时,它会导致 OutOfOffSet 异常。该异常导致 Spark 使用最新的偏移量重置偏移量。因此我无法收到任何消息。解决方案是我需要设置适当的 fromOffset 以避免异常。

关于apache-spark - Kafka spark directStream 无法获取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31978998/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com