gpt4 book ai didi

apache-spark - 如果在 S3 的 Parquet 文件中发现同一列的不同数据类型,AWS Glue 作业会在 Redshift 中创建新列

转载 作者:行者123 更新时间:2023-12-05 03:55:06 25 4
gpt4 key购买 nike

我正在尝试使用 Glue Job 将 S3 中的 Parquet 文件加载到 Redshift 中。当我第一次运行 Glue Job 时,它正在创建表并加载数据,但是当通过更改 1 列的数据类型第二次运行时,作业并没有失败,而是在 Redshift 中创建新列并附加数据。

例如:在这里,我正在更改整数的数据类型

FileName **abc**
Code,Name,Amount
'A','XYZ',200.00

FileName **xyz**
Code,Name,Amount
'A','XYZ',200.00

在 Redshift 中

Output after processing both the above file:
Code Name Amount Amount_String
A XYZ 200.00
A XYZ 200.00

代码

import os
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import SQLContext
from datetime import date

from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

spark = SparkSession.builder.getOrCreate()
glueContext = GlueContext(SparkContext.getOrCreate())

spark.conf.set('spark.sql.session.timeZone', 'Europe/London')

#sc = SparkContext()

data_source = "s3://bucket/folder/data/"
#read delta and source dataset
employee = spark.read.parquet(data_source)


sq_datasource0 = DynamicFrame.fromDF(employee, glueContext, "new_dynamic_frame")

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = sq_datasource0, catalog_connection = "redshiftDB", connection_options = {"dbtable": "employee", "database": "dbname"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

如果数据类型不匹配问题来自文件,我想使 Glue Job 失败。如果您能提供解决此问题的任何指导,我将不胜感激。

最佳答案

上述问题是由于 Glue dynamicFrame 使用的 Redshift writer 引起的。如果输入数据的特定列中存在空记录,这将使用 alter table query 为 Redshfit 中的表创建新列。

为避免此行为,将 Glue dynamicFrame 转换为 Spark 数据帧并写入 redshift。

val amDf = am.toDF()
amDf.write.format("com.databricks.spark.redshift")
.mode(SaveMode.Overwrite)
.option("url", JDBC_URL)
.option("dbtable", TABLE_NAME)
.option("user", USER)
.option("password", PASSWORD)
.option("aws_iam_role", IAM_ROLE)
.option("tempdir", args("TempDir"))
.save()

关于apache-spark - 如果在 S3 的 Parquet 文件中发现同一列的不同数据类型,AWS Glue 作业会在 Redshift 中创建新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60482835/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com