gpt4 book ai didi

scala - Spark流+卡夫卡: how to check name of topic from kafka message

转载 作者:行者123 更新时间:2023-12-02 13:03:32 25 4
gpt4 key购买 nike

我正在使用 Spark Streaming 从 Kafka 主题列表中读取内容。我正在关注官方API link 。我使用的方法是:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest")
val topics = Set(configuration.getKafkaInputTopic())
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)

我想知道执行者将如何从主题列表中读取消息?他们的政策是什么?他们会阅读一个主题,然后在完成后将消息传递到其他主题吗?

最重要的是,调用此方法后,如何检查 RDD 中消息的主题是什么?

stream.foreachRDD(rdd => rdd.map(t => {
val key = t._1
val json = t._2
val topic = ???
})

最佳答案

I am wondering how will the executor read from the message from the list of topics ? What will be their policy? Will they read a topic and then when they finish the messages pass to the other topics?

在直接流式传输方法中,驱动程序负责将偏移量读取到您想要使用的 Kafka 主题中。它的作用是在主题、分区和需要读取的偏移量之间创建映射。之后,驱动程序会为每个工作人员分配一个范围来读取特定的 Kafka 主题。这意味着,如果单个工作线程可以同时运行 2 个任务(仅出于示例目的,它通常可以运行更多任务),那么它就有可能同时从 Kafka 的两个单独主题中读取数据。

how can I, after calling this method, check what is the topic of a message in the RDD?

您可以使用 createDirectStream 的重载,它采用 MessageHandler[K, V]:

val topicsToPartitions: Map[TopicAndPartition, Long] = ???

val stream: DStream[(String, String)] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topicsToPartitions,
mam: MessageAndMetadata[String, String]) => (mam.topic(), mam.message())

关于scala - Spark流+卡夫卡: how to check name of topic from kafka message,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43226139/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com