gpt4 book ai didi

apache-spark - 是否可以在 Kafka+Spark Streaming 中获取特定的消息偏移量?

转载 作者:行者123 更新时间:2023-12-04 14:41:35 25 4
gpt4 key购买 nike

我正在尝试使用 Spark Direct Stream 获取并存储 Kafka 中特定消息的偏移量。
查看 Spark 文档很容易获取每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量。

最佳答案

是的,您可以使用 MessageAndMetadata createDirectStream 的版本它允许您访问 message metadata .

您可以在此处找到返回 tuple3 的 Dstream 的示例。 .

val ssc = new StreamingContext(sparkConf, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> (kafkaBroker))
var fromOffsets = Map[TopicAndPartition, Long]()
val topicAndPartition: TopicAndPartition = new TopicAndPartition(kafkaTopic.trim, 0)
val topicAndPartition1: TopicAndPartition = new TopicAndPartition(kafkaTopic1.trim, 0)
fromOffsets += (topicAndPartition -> inputOffset)
fromOffsets += (topicAndPartition1 -> inputOffset1)

val messagesDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple3[String, Long, String]](ssc, kafkaParams, fromOffsets, (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic ,mmd.offset, mmd.message().toString)
})

在上面的例子中 tuple3._1将有 topic , tuple3._2将有 offsettuple3._3将有 message .

希望这可以帮助!

关于apache-spark - 是否可以在 Kafka+Spark Streaming 中获取特定的消息偏移量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37572619/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com