gpt4 book ai didi

scala - 更改 DataFrame 中嵌套列的值

转载 作者:行者123 更新时间:2023-12-01 07:29:06 26 4
gpt4 key购买 nike

我有两级嵌套字段的数据框

 root
|-- request: struct (nullable = true)
| |-- dummyID: string (nullable = true)
| |-- data: struct (nullable = true)
| | |-- fooID: string (nullable = true)
| | |-- barID: string (nullable = true)

我想在此处更新 fooId 列的值。我能够更新第一级的值,例如 dummyID 列在这里使用这个问题作为引用 How to add a nested column to a DataFrame

输入数据:

{
"request": {
"dummyID": "test_id",
"data": {
"fooID": "abc",
"barID": "1485351"
}
}
}

输出数据:

{
"request": {
"dummyID": "test_id",
"data": {
"fooID": "def",
"barID": "1485351"
}
}
}

我怎样才能使用 Scala 来做到这一点?

最佳答案

以下是此问题的通用解决方案,它可以根据递归遍历中应用的任意函数在任何级别更新任意数量的嵌套值:

def mutate(df: DataFrame, fn: Column => Column): DataFrame = {
// Get a projection with fields mutated by `fn` and select it
// out of the original frame with the schema reassigned to the original
// frame (explained later)
df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, df.schema)
}

def traverse(schema: StructType, fn: Column => Column, path: String = ""): Array[Column] = {
schema.fields.map(f => {
f.dataType match {
case s: StructType => struct(traverse(s, fn, path + f.name + "."): _*)
case _ => fn(col(path + f.name))
}
})
}

这实际上等同于通常的“将整个结构重新定义为投影”解决方案,但它使用原始结构自动重新嵌套字段并保留可空性/元数据(当您手动重新定义结构时会丢失)。令人讨厌的是,在创建投影(afaict)时无法保留这些属性,因此上面的代码手动重新定义了架构。

一个示例应用程序:

case class Organ(name: String, count: Int)
case class Disease(id: Int, name: String, organ: Organ)
case class Drug(id: Int, name: String, alt: Array[String])

val df = Seq(
(1, Drug(1, "drug1", Array("x", "y")), Disease(1, "disease1", Organ("heart", 2))),
(2, Drug(2, "drug2", Array("a")), Disease(2, "disease2", Organ("eye", 3)))
).toDF("id", "drug", "disease")

df.show(false)

+---+------------------+-------------------------+
|id |drug |disease |
+---+------------------+-------------------------+
|1 |[1, drug1, [x, y]]|[1, disease1, [heart, 2]]|
|2 |[2, drug2, [a]] |[2, disease2, [eye, 3]] |
+---+------------------+-------------------------+

// Update the integer field ("count") at the lowest level:
val df2 = mutate(df, c => if (c.toString == "disease.organ.count") c - 1 else c)
df2.show(false)

+---+------------------+-------------------------+
|id |drug |disease |
+---+------------------+-------------------------+
|1 |[1, drug1, [x, y]]|[1, disease1, [heart, 1]]|
|2 |[2, drug2, [a]] |[2, disease2, [eye, 2]] |
+---+------------------+-------------------------+

// This will NOT necessarily be equal unless the metadata and nullability
// of all fields is preserved (as the code above does)
assertResult(df.schema.toString)(df2.schema.toString)

这样做的一个限制是它不能添加新字段,只能更新现有字段(尽管 map 可以更改为 flatMap 和返回 Array[Column] 的函数,如果您不关心保留可空性/元数据)。

此外,这里有一个更通用的 Dataset[T] 版本:

case class Record(id: Int, drug: Drug, disease: Disease)

def mutateDS[T](df: Dataset[T], fn: Column => Column)(implicit enc: Encoder[T]): Dataset[T] = {
df.sqlContext.createDataFrame(df.select(traverse(df.schema, fn):_*).rdd, enc.schema).as[T]
}

// To call as typed dataset:
val fn: Column => Column = c => if (c.toString == "disease.organ.count") c - 1 else c
mutateDS(df.as[Record], fn).show(false)

// To call as untyped dataset:
implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) // This is necessary regardless of sparkSession.implicits._ imports
mutateDS(df, fn).show(false)

关于scala - 更改 DataFrame 中嵌套列的值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50123771/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com