gpt4 book ai didi

apache-spark - Spark 结构化流 kafka 转换 JSON 没有模式(推断模式)

转载 作者:行者123 更新时间:2023-12-04 03:00:26 25 4
gpt4 key购买 nike

我读过 Spark Structured Streaming 不支持将 Kafka 消息读取为 JSON 的模式推断。有没有办法像 Spark Streaming 一样检索架构:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema

最佳答案

这是一种可能的方法:

  • 在开始流式传输之前,从 Kafka 获取一小批数据
  • 从小批量
  • 推断模式
  • 使用提取的模式开始流式传输数据。

  • 下面的伪代码说明了这种方法。

    第1步:

    从 Kafka 中提取一小部分(两条记录),
    val smallBatch = spark.read.format("kafka")
    .option("kafka.bootstrap.servers", "node:9092")
    .option("subscribe", "topicName")
    .option("startingOffsets", "earliest")
    .option("endingOffsets", """{"topicName":{"0":2}}""")
    .load()
    .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()

    第2步:
    将小批量写入文件:
    smallBatch.write.mode("overwrite").format("text").save("/batch")

    此命令将小批量写入 hdfs 目录/batch。它创建的文件的名称是 part-xyz*。因此,您首先需要使用 hadoop FileSystem 命令重命名文件(请参阅 org.apache.hadoop.fs._ 和 org.apache.hadoop.conf.Configuration,这是一个示例 https://stackoverflow.com/a/41990859),然后将文件读取为 json:
    val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema

    其中,batchName.txt 是文件的新名称,smallBatchSchema 包含从小批量推断的架构。

    最后,您可以按如下方式流式传输数据(步骤 3):
    val inputDf = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "node:9092")
    .option("subscribe", "topicName")
    .option("startingOffsets", "earliest")
    .load()

    val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
    .select( from_json($"json", schema=smallBatchSchema).as("data"))
    .select("data.*")

    希望这可以帮助!

    关于apache-spark - Spark 结构化流 kafka 转换 JSON 没有模式(推断模式),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48361177/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com