gpt4 book ai didi

amazon-web-services - 在 AWS 胶水动态数据框中添加一列

转载 作者:行者123 更新时间:2023-12-04 15:38:48 25 4
gpt4 key购买 nike

我对 AWS Glue 非常陌生。我正在做一个小项目,要求是从 S3 存储桶中读取一个文件,将其转置并将其加载到 mysql 表中。 S3 存储桶中的源数据如下所示

    +----+----+-------+-----+---+--+--------+
|cost|data|minutes|name |sms|id|category|
+----+----+-------+-----+---+--+--------+
| 5 |1000| 200 |prod1|500|p1|service |
+----+----+-------+-----+---+--+--------+
目标表结构是
Product_id、参数、值
我期望目标表具有以下值
p1,成本,5
P1,数据,1000
我能够加载带有 ID 和值的目标表。但我无法填充参数列。该列不存在于输入数据中,我想根据要填充的列值填充一个字符串。
这是我用于成本的代码。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource
## @args: [database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0")

## @type: ApplyMapping
## @args: [mapping = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1")

## @type: SelectFields
## @args: [paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2")

## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3")

## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

## @type: DataSink
## @args: [database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5")

job.commit()
有人可以帮我将此新列添加到我的数据框中,以便它可以在表中使用吗?
谢谢

最佳答案

对于较小的数据框,您可以执行以下操作

  • 将动态帧转换为 Spark 数据帧
  • 添加栏目
  • 转换回动态框架

  • 第1步
    datasource0 = datasource0.toDF()
    第2步
    from pyspark.sql.functions import udf
    getNewValues = udf(lambda val: val+1) # you can do what you need to do here instead of val+1

    datasource0 = datasource0.withColumn('New_Col_Name', getNewValues(col('some_existing_col'))
    第 3 步
    from awsglue.dynamicframe import DynamicFrame
    datasource0 = DynamicFrame.fromDF(datasource0, glueContext, "datasource0")
    问题是当你有一个大数据集时, toDF() 操作非常昂贵!

    关于amazon-web-services - 在 AWS 胶水动态数据框中添加一列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58807940/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com