gpt4 book ai didi

scala - 在 zeppelin notebook 中保存 Spark 流消耗的 kafka 消息

转载 作者:行者123 更新时间:2023-12-01 03:35:52 26 4
gpt4 key购买 nike

我在 zeppelin notebook 中保存 Spark 流消耗的 kafka 消息时遇到问题。

我的代码是:

case class Message(id: Long, message: String, timestamp: Long) extends Serializable

val ssc = new StreamingContext(sc, Seconds(2))

val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)

val mes = messagesStream.window(Seconds(10))

mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))

ssc.start()

当我运行时 %sql select * from messages它不显示任何数据,但已定义表。如果我在 Cassandra 上将保存更改为 tempTable,它会正确保存并显示数据。不明白为什么会这样。

感谢帮助。

最佳答案

好的,这是问题所在。让我们首先回顾一下 foreachRDD 运算符定义:
foreachRDD没有按照预期的方式使用。它是最通用的输出运算符,它将函数 func 应用于从流生成的每个 RDD。该函数应该将每个 RDD 中的数据推送到 外部系统,例如将 RDD 保存到文件中,或者通过网络将其写入数据库。请注意,函数 func 在运行流应用程序的驱动程序进程中执行,并且通常会在其中包含 RDD 操作,这些操作将强制计算流 RDD。

因此,您的代码实际发生的情况如下:

由于 DStreams 被输出操作惰性执行,就像 RDDs 被 RDD 操作惰性执行一样。具体来说,DStream 输出操作中的 RDD 操作会强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,而您没有,或者有像 dstream.foreachRDD() 这样的输出操作,而其中没有任何 RDD 操作,则不会执行任何操作。系统将简单地接收数据并丢弃它。

所以每次执行 registerTempTable 都会丢弃你的 RDD 数据所以你的 SQL 查询给出了一个空的结果。

要解决您的问题,您需要将数据保存在某处(Cassandra 是一个不错的选择),然后对其进行查询。

关于scala - 在 zeppelin notebook 中保存 Spark 流消耗的 kafka 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34979794/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com