gpt4 book ai didi

arrays - 如何使用 pyspark 在 aws glue 中的嵌套 json 中展平数组?

转载 作者:行者123 更新时间:2023-12-05 03:58:04 25 4
gpt4 key购买 nike

我正在尝试扁平化一个 JSON 文件,以便能够在 AWS Glue 中将其加载到 PostgreSQL 中。我正在使用 PySpark。我使用爬虫爬取 S3 JSON 并生成一个表。然后我使用 ETL Glue 脚本来:

  • 读取爬取的表
  • 使用'Relationalize' 函数来扁平化文件
  • 将动态帧转换为数据帧
  • 尝试“分解”request.data 字段

到目前为止的脚本:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")

df0 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")

df1 = df0.select(dfc_root_table_name)

df2 = df1.toDF()

df2 = df1.select(explode(col('`request.data`')).alias("request_data"))

<then i write df1 to a PostgreSQL database which works fine>

我遇到的问题:

“Relationalize”功能运行良好,但 request.data 字段变为 bigint,因此“explode”不起作用。

由于数据的结构,如果不首先对 JSON 使用“Relationalize”,就无法完成分解。具体错误是:“org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析'explode(request.data)':函数explode的输入应该是数组或映射类型,不是 bigint”

如果我首先尝试将动态帧设为数据帧,则会遇到此问题:“py4j.protocol.Py4JJavaError:调用 o72.jdbc 时出错。: java.lang.IllegalArgumentException: 无法获取结构的 JDBC 类型...”

我还尝试上传一个分类器,以便数据在抓取过程中变平,但 AWS 确认这行不通。

原始文件的JSON格式如下,我正在尝试归一化:

- field1
- field2
- {}
- field3
- {}
- field4
- field5
- []
- {}
- field6
- {}
- field7
- field8
- {}
- field9
- {}
- field10

最佳答案

# Flatten nested df  
def flatten_df(nested_df):
for col in nested_df.columns:


array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
for col in array_cols:
nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))

nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
if len(nested_cols) == 0:
return nested_df

flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']

flat_df = nested_df.select(flat_cols +
[F.col(nc+'.'+c).alias(nc+'_'+c)
for nc in nested_cols
for c in nested_df.select(nc+'.*').columns])

return flatten_df(flat_df)

df=flatten_df(df)

它将用下划线替换所有点。请注意,它使用 explode_outer 而不是 explode 来包含 Null 值,以防数组本身为 null。此功能仅在 spark v2.4+ 中可用。

另请记住,展开数组会添加更多重复项,并且整体行大小会增加。展平结构将增加列大小。简而言之,你原来的df会水平和垂直爆炸。稍后可能会减慢处理数据的速度。

因此,我的建议是识别与特征相关的数据,并仅将这些数据存储在 postgresql 中,并将原始 json 文件存储在 s3 中。

关于arrays - 如何使用 pyspark 在 aws glue 中的嵌套 json 中展平数组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58239693/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com