gpt4 book ai didi

scala - 从 RDD 访问 KafkaOffset 时出现异常

转载 作者:行者123 更新时间:2023-12-04 23:40:19 24 4
gpt4 key购买 nike

我有一个来自 Kafka 的 Spark 消费者。
我正在尝试管理恰好一次语义的偏移量。

但是,在访问偏移量时,它会引发以下异常:

"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges"



执行此操作的代码部分如下:
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })

这里 dataStream 是使用 KafkaUtils API 创建的直接流(DStream[String]),例如:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)

如果有人可以帮助我理解我在这里做错了什么。
转换是官方文档中提到的对数据流执行的方法链中的第一个方法

谢谢。

最佳答案

你的问题是:

.map(._2)

这创建了一个 MapPartitionedDStream而不是 DirectKafkaInputDStream创建者 KafkaUtils.createKafkaStream .

您需要 map之后 transform :
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t))

kafkaStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.map(_._2)
.foreachRDD(rdd => // stuff)

关于scala - 从 RDD 访问 KafkaOffset 时出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39409237/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com