gpt4 book ai didi

apache-spark - __HIVE_DEFAULT_PARTITION__ 作为胶水 ETL 作业中的分区值

转载 作者:行者123 更新时间:2023-12-04 13:01:50 28 4
gpt4 key购买 nike

我有通过胶水爬虫爬取的 CSV 数据,最终出现在一张表中。

我正在尝试运行 ETL 作业以将磁盘上的数据重新分区为日期列的某些组件。然后将 CSV 转换为 Parquet 。

即我的数据中有一个名为“date”的列,并且想将数据分区为 s3 上的年、月、日分区。

我能够转换为 Parquet 并让它在序列号值(不同的列)上正确分区,但它会将值“__HIVE_DEFAULT_PARTITION__”放入日期相关分区的所有值年、月和日。

我能够在其他列(如序列号)上进行分区,但年/月/日不在原始数据集中,因此我的方法是将日期列中的值创建为数据中的新列设置并告诉 write_dynamic_frame 函数按列进行分区,但这不起作用。

我一般不熟悉 spark/pyspark 和胶水,所以很有可能我错过了一些简单的东西。

感谢任何提供帮助的人。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job


args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("date", "date", "date", "date"), ("serial-number", "string", "serial-number", "string")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

to_spark_df4 = dropnullfields3.toDF()

with_file_name_df5 = to_spark_df4.withColumn("input_file_name", F.input_file_name()).withColumn('year', F.year(F.col("date").cast("date"))).withColumn('month', F.month(F.col("date").cast("date"))).withColumn('day', F.dayofmonth(F.col("date").cast("date")))

back_to_glue_df8 = DynamicFrame.fromDF(with_file_name_df5, glueContext, "back_to_glue_df8")


datasink4 = glueContext.write_dynamic_frame.from_options(frame = back_to_glue_df8, connection_type = "s3", connection_options = {"path": "s3://output/path","partitionKeys": ["serial-number","year", "month","day"]}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

结果是我在 s3 中的键最终看起来像这样:
serial-number=1234567890/year=__HIVE_DEFAULT_PARTITION__/month=__HIVE_DEFAULT_PARTITION__/day=__HIVE_DEFAULT_PARTITION__/part-01571-273027e4-72ba-45ff-ac15-c0bb2f342e58.c000.snappy.parquet

更新:编辑格式

最佳答案

我从事的工作与您的非常相似。我希望你现在设法解决它,但无论如何,这是解决你困境的方法:

基本解决方案:

from pyspark.sql.functions import year, month, dayofmonth

###### rest of your code until ApplyMapping included ######

# add year, month & day columns, non zero-padded
df = df.toDF()
df = df.withColumn('year', year(df.date))\
.withColumn('month', month(df.date))\
.withColumn('day', dayofmonth(df.date))

补充说明:

如果您需要在想要选择日期范围的 Athena 上运行查询,我建议您避免使用嵌套分区(因此年 -> 月 -> 日),而是使用平面分区模式。这样做的原因是查询变得更容易编写。这是获取平面模式的python代码:

from pyspark.sql.functions import date_format

###### rest of your code until ApplyMapping included ######

df = df.toDF()
df = df.withColumn('date_2', date_format(df.date, 'yyyy-MM-dd'))

# date_2 is because column "date" already exists,
# but we want the partitioning one to be in a string format.
# You can later drop the original column if you wish.

假设现在您要查询从 3 月 15 日到 2020 年 4 月 3 日的数据。

这是基于您选择的分区模式的 SQL 查询。

嵌套模式

SELECT item_1, item_2
FROM my_table
WHERE year = 2020
AND (
(month = 3 AND day >= 15)
OR (month = 4 AND day <= 3)
)

平面模式

SELECT item_1, item_2
FROM my_table
WHERE date BETWEEN '2020-03-15' AND '2020-04-3'

此外,鉴于您的“日期”列存储为字符串,您将能够使用 LIKE 运行查询。运算符(operator)。

例如,如果要查询数据库中每个 4 月的所有数据,可以执行以下操作:

SELECT item_1, item_2
FROM my_table
WHERE date LIKE '%-04-%'

关于apache-spark - __HIVE_DEFAULT_PARTITION__ 作为胶水 ETL 作业中的分区值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55246871/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com