gpt4 book ai didi

python - 如何在 AWS Glue 中正确重命名动态数据帧的列?

转载 作者:行者123 更新时间:2023-12-04 10:57:10 25 4
gpt4 key购买 nike

我加载 JSON 数据并在动态数据帧上使用关系化方法来展平原本嵌套的 JSON 对象并将其保存为 Parquet 格式。问题是,一旦为更快的 Athena 查询保存为 parquet 格式,列名包含点,这违反了 Athena SQL 查询语法,因此我无法进行特定于列的查询。

为了解决这个问题,我还重命名了 Glue 作业中的列名称以排除点并使用下划线代替。我的问题是两者中的哪种方法会更好,为什么? (效率-内存?节点上的执行速度?等)。

还考虑到可怕的 aws 胶水文档,我无法提出仅动态框架的解决方案。我在以动态方式获取列名时遇到问题,因此我正在使用 toDF()。

1) 第一种方法是从动态 df 中提取的 df 中获取列名

relationalize1 = Relationalize.apply(frame=datasource0, transformation_ctx="relationalize1").select("roottable")
df_relationalize1 = relationalize1.toDF()
for field in df_relationalize1.schema.fields:
relationalize1 = RenameField.apply(frame = relationalize1, old_name = "`"+field.name+"`", new_name = field.name.replace(".","_"), transformation_ctx = "renamefield_" + field.name)

2)第二种方法是从动态df中提取df并在pyspark df(而不是动态df)上执行重命名字段,然后转换回动态df并将其保存为 Parquet 格式。

有没有更好的方法?爬虫可以重命名列吗? .fromDF() 方法有多快?是否有比 pdf 开发人员指南更好的函数和方法文档?

最佳答案

该问题专门询问重命名:
(a) 转换为 DataFrame .
(b) 创建 new_columns具有所需列名的数组,其顺序与 old_columns 相同.
(c) 覆盖并持久化 new_columns使用 functools.reduce()pyspark.withColumnRenamed() .
(d) 转换回 DynamicFrame .

from awsglue.job import Job
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from functools import reduce

JOB_NAME = "csv_to_parquet"
sc = SparkContext()
glue_context = GlueContext(sc)
job = Job(glue_context)
job.init(JOB_NAME)

# Create DynamicFrame
datasource = glue_context.create_dynamic_frame_from_options(
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://path/to/source/file.csv"]},
format_options={"withHeader": True, "separator": chr(44)}, # comma delimited
)

# (a) Convert to DataFrame
df = datasource.toDF()

# (b) Create array with desired columns
old_columns = df.schema.names
new_columns = [
field.lower().replace(" ", "_").replace(".", "_") for field in old_columns
]

# (c) Overwrite and persist `new_columns`
df = reduce(
lambda df, idx: df.withColumnRenamed(old_columns[idx], new_columns[idx]),
range(len(old_columns)),
df,
)

# (d) Convert back to DynamicFrame
datasource = datasource.fromDF(df, glue_context, "datasource")

# Write DynamicFrame as Parquet
datasink = glue_context.write_dynamic_frame_from_options(
frame=datasource,
connection_type="s3",
connection_options={"path": "s3://path/to/target/prefix/"},
format="parquet",
)

Blockquote

关于python - 如何在 AWS Glue 中正确重命名动态数据帧的列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59103659/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com