gpt4 book ai didi

apache-kafka - 从 Spark 流中的kafka消息中提取时间戳?

转载 作者:行者123 更新时间:2023-12-04 03:16:25 25 4
gpt4 key购买 nike

试图从 kafka 源中读取。我想从收到的消息中提取时间戳以进行结构化 Spark 流。
卡夫卡(版本 0.10.0.0)
Spark 流(2.0.1版)

最佳答案

我建议几件事:

  • 假设您通过最新的 Kafka Streaming Api (0.10 Kafka) 创建了一个流

    例如。您使用依赖项:"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
    根据上面的文档,比您创建一个流:
     val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "broker1:9092,broker2:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[ByteArrayDeserializer],
    "group.id" -> "spark-streaming-test",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean))

    val sparkConf = new SparkConf()
    // suppose you have 60 second window
    val ssc = new StreamingContext(sparkConf, Seconds(60))
    ssc.checkpoint("checkpoint")

    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent,
    Subscribe[String, Array[Byte]](topics, kafkaParams))
  • 您的流将是 ConsumerRecord[String,Array[Byte]] 的 DStream,您可以像这样简单地获得时间戳和键值:
    stream.map { record => (record.timestamp(), record.key(), record.value())  }

  • 希望有帮助。

    关于apache-kafka - 从 Spark 流中的kafka消息中提取时间戳?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40586663/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com