gpt4 book ai didi

python - 重命名 spark 数据框中的嵌套字段

转载 作者:太空狗 更新时间:2023-10-29 21:38:53 27 4
gpt4 key购买 nike

在 Spark 中有一个数据框 df:

 |-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

如何将字段 array_field.a 重命名为 array_field.a_renamed

[更新]:

.withColumnRenamed() 不适用于嵌套字段,所以我尝试了这个 hacky 和不安全的方法:

# First alter the schema:
schema = df.schema
schema['array_field'].dataType.elementType['a'].name = 'a_renamed'

ind = schema['array_field'].dataType.elementType.names.index('a')
schema['array_field'].dataType.elementType.names[ind] = 'a_renamed'

# Then set dataframe's schema with altered schema
df._schema = schema

我知道设置私有(private)属性不是一个好的做法,但我不知道其他设置 df 模式的方法

我认为我在正确的轨道上,但 df.printSchema() 仍然显示 array_field.a 的旧名称,尽管 df.schema ==模式True

最佳答案

python

无法修改单个嵌套字段。你必须重新创建一个完整的结构。在这种特殊情况下,最简单的解决方案是使用 cast

首先是一堆导入:

from collections import namedtuple
from pyspark.sql.functions import col
from pyspark.sql.types import (
ArrayType, LongType, StringType, StructField, StructType)

和示例数据:

Record = namedtuple("Record", ["a", "b", "c"])

df = sc.parallelize([([Record("foo", 1, 3)], )]).toDF(["array_field"])

让我们确认架构与您的情况相同:

df.printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

你可以定义一个新的模式,例如一个字符串:

str_schema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select(col("array_field").cast(str_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

数据类型:

struct_schema = ArrayType(StructType([
StructField("a_renamed", StringType()),
StructField("b", LongType()),
StructField("c", LongType())
]))

df.select(col("array_field").cast(struct_schema)).printSchema()
root
|-- array_field: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a_renamed: string (nullable = true)
| | |-- b: long (nullable = true)
| | |-- c: long (nullable = true)

斯卡拉

可以在 Scala 中使用相同的技术:

case class Record(a: String, b: Long, c: Long)

val df = Seq(Tuple1(Seq(Record("foo", 1, 3)))).toDF("array_field")

val strSchema = "array<struct<a_renamed:string,b:bigint,c:bigint>>"

df.select($"array_field".cast(strSchema))

import org.apache.spark.sql.types._

val structSchema = ArrayType(StructType(Seq(
StructField("a_renamed", StringType),
StructField("b", LongType),
StructField("c", LongType)
)))

df.select($"array_field".cast(structSchema))

可能的改进:

如果您使用富有表现力的数据操作或 JSON 处理库,那么将数据类型转储到 dict 或 JSON 字符串并从那里获取它可能会更容易(Python/toolz ):

from toolz.curried import pipe, assoc_in, update_in, map
from operator import attrgetter

# Update name to "a_updated" if name is "a"
rename_field = update_in(
keys=["name"], func=lambda x: "a_updated" if x == "a" else x)

updated_schema = pipe(
# Get schema of the field as a dict
df.schema["array_field"].jsonValue(),
# Update fields with rename
update_in(
keys=["type", "elementType", "fields"],
func=lambda x: pipe(x, map(rename_field), list)),
# Load schema from dict
StructField.fromJson,
# Get data type
attrgetter("dataType"))

df.select(col("array_field").cast(updated_schema)).printSchema()

关于python - 重命名 spark 数据框中的嵌套字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43004849/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com