gpt4 book ai didi

dataframe - 如何使用 Spark DataFrames 查询 JSON 数据列?

转载 作者:行者123 更新时间:2023-12-03 07:37:54 26 4
gpt4 key购买 nike

我有一个 Cassandra 表,为简单起见,它看起来像:

key: text
jsonData: text
blobData: blob

我可以使用spark和spark-cassandra-connector为此创建一个基本数据框架:

val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()

我正在努力将 JSON 数据扩展到其底层结构。我最终希望能够根据 json 字符串中的属性进行过滤并返回 blob 数据。类似于 jsonData.foo = "bar"并返回 blobData。目前可能吗?

最佳答案

Spark >= 2.4

如果需要,可以使用 schema_of_json 确定架构函数(请注意,这假设任意行是架构的有效代表)。

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

Spark >= 2.1

您可以使用from_json功能:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

Spark >= 1.6

您可以使用 get_json_object 来获取列和路径:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

并将字段提取为单个字符串,这些字符串可以进一步转换为预期类型。

path 参数使用点语法表示,前导 $. 表示文档根(因为上面的代码使用字符串插值 $ 具有被转义,因此 $$.)。

Spark <= 1.5:

Is this currently possible?

据我所知,这不可能直接实现。您可以尝试类似的操作:

val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我假设 blob 字段无法用 JSON 表示。否则,您可以省略拆分和连接:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)

另一种(更便宜,但更复杂)方法是使用 UDF 解析 JSON 并输出 structmap 列。例如这样的事情:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)

关于dataframe - 如何使用 Spark DataFrames 查询 JSON 数据列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34069282/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com