gpt4 book ai didi

python - Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres

转载 作者:行者123 更新时间:2023-12-04 15:08:11 26 4
gpt4 key购买 nike

我有以下数据框 (df4):

+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|id |exclusion_reason |created_at |updated_at |
+------------------------------------+------------------------------------------+--------------------------+--------------------------+
|4c01d951-2ec5-4ba4-bfe2-8ba9c3029962|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|dac14ca3-bf44-4e3c-80e8-0e2d6d2ff576|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|6d277012-ff6c-4202-bbd7-64cbd467ca28|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|0388163e-2614-4b71-b707-623337d58387|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
|01daec52-408c-44e3-965a-b87daa334a1a|{"ship_reason": "A1", "bill_reason": "A1"}|2021-01-12 16:04:19.673197|2021-01-12 16:04:19.694706|
+------------------------------------+------------------------------------------+--------------------------+--------------------------+

我需要写入 Postgres 数据库。我正在使用 AWS Glue 并且 Postgres 数据库位于 VPC 中,因此我需要使用胶水连接和 glueContext.write_dynamic_frame.from_jdbc_conf 方法来实现。问题是我一直收到错误 ERROR: column "matchback_exclusion_reason"is of type jsonb but expression is of type character。数据框中的数据类型为string,数据库中的数据类型为JSONB。

我看到有人建议我只需要将 stringtype: "unspecified" 添加到我的 write 语句中,但以下会产生相同的错误:

datasource2 = DynamicFrame.fromDF(df4, glueContext, "ParquetToWrite")

output = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource2, catalog_connection = "MPtest", connection_options = {"stringtype":"unspecified", "database" : "app", "dbtable" : "orders"})

我能以某种方式将此列转换为 JSON 吗?我已经尝试创建一个结构类型来解析元素,但这也没有用(下面的代码):

schema = StructType([StructField("ship_reason", StringType()),StructField("bill_reason", StringType())])
df4Test.select(f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

df4Test = df4Test.withColumn("exclusion_reason", f.from_json(df4.exclusion_reason, schema).alias("exclusion_reason"))

是否可以将列类型修改为JSONB类型?理想情况下,我基本上只想“json.load”exclusion_reason 列,这样我就可以将它写入 Postgres。

最佳答案

ResolveChoiceexclusion_reason 转换为 json 可能会有所帮助:

datasource3 = datasource2.resolveChoice(specs = [('exclusion_reason','cast:json')])

来自 AWS Glue Developer Guide

Use ResolveChoice to specify how a column should be handled when itcontains values of multiple types. You can choose to either cast thecolumn to a single data type, discard one or more of the types, orretain all types in either separate columns or a structure. You canselect a different resolution policy for each column or specify aglobal policy that is applied to all columns.

关于python - Pyspark 使用 AWS Glue 将 JSON 列写入 Postgres,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65689664/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com