gpt4 book ai didi

apache-spark - 我可以在 Glue 中将 RDD 转换为 DataFrame 吗?

转载 作者:行者123 更新时间:2023-12-05 05:44:40 28 4
gpt4 key购买 nike

我的 lambda 函数通过 boto3 glue.start_job_run 触发胶水作业

这是我的胶水作业脚本

from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max

conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)

args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']

inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"

print(bucket, fileName)

rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()]+',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)

Glue 作业错误消息是“AttributeError: 'PipelinedRDD' object has no attribute 'toDF'

谷歌搜索后,我注意到在 glue 中“toDF”表示 DynamicFrame 到 DataFrame。

不是RDD到DataFrame的意思。

如何在胶水中将 RDD 转换为 DataFrame?

最佳答案

您不能使用 toDF() 定义模式类型。通过使用 toDF() 方法,我们无法控制模式自定义。话虽如此,使用 createDataFrame() 方法我们可以完全控制模式自定义。

看下面的逻辑-

from pyspark.sql.types import *

schema = StructType([ StructField('rawEntities', StringType()), StructField('Count' , IntegerType())])

df = spark.createDataFrame(data=<your rdd>, schema = schema)

关于apache-spark - 我可以在 Glue 中将 RDD 转换为 DataFrame 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71547278/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com