gpt4 book ai didi

apache-spark - 如何在 AWS Glue PySpark 中运行并行线程?

转载 作者:行者123 更新时间:2023-12-04 16:40:05 27 4
gpt4 key购买 nike

我有一个 Spark 作业,它只会从具有相同转换的多个表中提取数据。基本上是一个遍历表列表、查询目录表、添加时间戳、然后推送到 Redshift 的 for 循环(下面的示例)。
完成这项工作大约需要 30 分钟。有没有办法在相同的 Spark /胶水环境下并行运行这些?如果可以避免的话,我不想创建单独的胶水作业。

import datetime
import os
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import *


# query the runtime arguments
args = getResolvedOptions(
sys.argv,
["JOB_NAME", "redshift_catalog_connection", "target_database", "target_schema"],
)

# build the job session and context
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# set the job execution timestamp
job_execution_timestamp = datetime.datetime.utcnow()

tables = []

for table in tables:
catalog_table = glueContext.create_dynamic_frame.from_catalog(
database="test", table_name=table, transformation_ctx=table
)
data_set = catalog_table.toDF().withColumn(
"batchLoadTimestamp", lit(job_execution_timestamp)
)

# covert back to glue dynamic frame
export_frame = DynamicFrame.fromDF(data_set, glueContext, "export_frame")

# remove null rows from dynamic frame
non_null_records = DropNullFields.apply(
frame=export_frame, transformation_ctx="non_null_records"
)

temp_dir = os.path.join(args["TempDir"], redshift_table_name)

stores_redshiftSink = glueContext.write_dynamic_frame.from_jdbc_conf(
frame=non_null_records,
catalog_connection=args["redshift_catalog_connection"],
connection_options={
"dbtable": f"{args['target_schema']}.{redshift_table_name}",
"database": args["target_database"],
"preactions": f"truncate table {args['target_schema']}.{redshift_table_name};",
},
redshift_tmp_dir=temp_dir,
transformation_ctx="stores_redshiftSink",
) ```

最佳答案

您可以执行以下操作来加快此过程

  • 启用作业的并发执行。
  • 分配足够数量的 DPU。
  • 将表列表作为参数传递
  • 使用 Glue 工作流程或步骤函数并行执行作业。

  • 现在假设您有 100 个表要摄取,您可以将列表分成 10 个表,并同时运行该作业 10 次。
    由于您的数据将并行加载,因此 Glue 作业运行的时间将减少,因此会产生更少的成本。
    更快的替代方法是直接使用 redshift 实用程序。
  • 在 redshift 中创建表并将 batchLoadTimestamp 列保留为默认为 current_timestamp。
  • 现在创建复制命令并将数据直接从 s3 加载到表中。
  • 使用 Glue python shell 作业利用 pg8000 运行复制命令。

  • 为什么这种方法会更快?
    因为 spark redshift jdbc 连接器首先将 spark 数据帧卸载到 s3,然后准备复制命令到 redshift 表。在直接运行 copy 命令时,您将消除运行 unload 命令并将数据读入 spark df 的开销。

    关于apache-spark - 如何在 AWS Glue PySpark 中运行并行线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62719781/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com