gpt4 book ai didi

scala - UnFlatten Dataframe 到特定结构

转载 作者:行者123 更新时间:2023-12-01 11:08:48 25 4
gpt4 key购买 nike

我有一个平面数据框( df ),其结构如下:

root
|-- first_name: string (nullable = true)
|-- middle_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- title: string (nullable = true)
|-- start_date: string (nullable = true)
|-- end_Date: string (nullable = true)
|-- city: string (nullable = true)
|-- zip_code: string (nullable = true)
|-- state: string (nullable = true)
|-- country: string (nullable = true)
|-- email_name: string (nullable = true)
|-- company: struct (nullable = true)
|-- org_name: string (nullable = true)
|-- company_phone: string (nullable = true)
|-- partition_column: string (nullable = true)

我需要将此数据帧转换为类似的结构(因为我的下一个数据将采用这种格式):
root
|-- firstName: string (nullable = true)
|-- middleName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- currentPosition: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- title: string (nullable = true)
| | |-- startDate: string (nullable = true)
| | |-- endDate: string (nullable = true)
| | |-- address: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- zipCode: string (nullable = true)
| | | |-- state: string (nullable = true)
| | | |-- country: string (nullable = true)
| | |-- emailName: string (nullable = true)
| | |-- company: struct (nullable = true)
| | | |-- orgName: string (nullable = true)
| | | |-- companyPhone: string (nullable = true)
|-- partitionColumn: string (nullable = true)

到目前为止,我已经实现了这一点:
case class IndividualCompany(orgName: String,
companyPhone: String)

case class IndividualAddress(city: String,
zipCode: String,
state: String,
country: String)

case class IndividualPosition(title: String,
startDate: String,
endDate: String,
address: IndividualAddress,
emailName: String,
company: IndividualCompany)

case class Individual(firstName: String,
middleName: String,
lastName: String,
currentPosition: Seq[IndividualPosition],
partitionColumn: String)


val makeCompany = udf((orgName: String, companyPhone: String) => IndividualCompany(orgName, companyPhone))
val makeAddress = udf((city: String, zipCode: String, state: String, country: String) => IndividualAddress(city, zipCode, state, country))

val makePosition = udf((title: String, startDate: String, endDate: String, address: IndividualAddress, emailName: String, company: IndividualCompany)
=> List(IndividualPosition(title, startDate, endDate, address, emailName, company)))


val selectData = df.select(
col("first_name").as("firstName"),
col("middle_name).as("middleName"),
col("last_name").as("lastName"),
makePosition(col("job_title"),
col("start_date"),
col("end_Date"),
makeAddress(col("city"),
col("zip_code"),
col("state"),
col("country")),
col("email_name"),
makeCompany(col("org_name"),
col("company_phone"))).as("currentPosition"),
col("partition_column").as("partitionColumn")
).as[Individual]

select_data.printSchema()
select_data.show(10)

我可以看到为 select_data 生成的正确模式,但它在我试图获取一些实际数据的最后一行出现错误。我收到一条错误消息,说无法执行用户定义的函数。
 org.apache.spark.SparkException: Failed to execute user defined function(anonfun$4: (string, string, string, struct<city:string,zipCode:string,state:string,country:string>, string, struct<orgName:string,companyPhone:string>) => array<struct<title:string,startDate:string,endDate:string,address:struct<city:string,zipCode:string,state:string,country:string>,emailName:string,company:struct<orgName:string,companyPhone:string>>>)

有没有更好的方法来实现这一目标?

最佳答案

这里的问题是 udf不能带IndividualAddressIndividualCompany直接作为输入。这些在 Spark 中表示为结构,并在 udf 中使用它们。正确的输入类型是 Row .这意味着您需要更改 makePosition 的声明到:

val makePosition = udf((title: String, 
startDate: String,
endDate: String,
address: Row,
emailName: String,
company: Row)

udf你现在需要使用例如 address.getAs[String]("city")要访问案例类元素,并将类作为一个整体使用,您需要再次创建它。

更简单、更好的选择是在一个 udf 中完成所有操作。如下:
val makePosition = udf((title: String, 
startDate: String,
endDate: String,
city: String,
zipCode: String,
state: String,
country: String,
emailName: String,
orgName: String,
companyPhone: String) =>
Seq(
IndividualPosition(
title,
startDate,
endDate,
IndividualAddress(city, zipCode, state, country),
emailName,
IndividualCompany(orgName, companyPhone)
)
)
)

关于scala - UnFlatten Dataframe 到特定结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53640130/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com