gpt4 book ai didi

python - PySpark/Aws Glue 中的性能问题

转载 作者:行者123 更新时间:2023-12-01 08:16:38 25 4
gpt4 key购买 nike

我有一个数据框。我需要将每条记录转换为 JSON,然后使用 JSON 负载调用 API 将数据插入 postgress。我的数据框中有 14000 条记录,调用 api 并获取响应需要 5 小时。有什么办法可以提高性能。下面是我的代码片段。

df_insert = spark.read \
.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "source_table_name") \
.load()

json_insert = df_insert.toJSON().collect()

for row in json_insert:
line = json.loads(row)
headers = {
'Authorization': authorization,
'content-type': "application/json",
'cache-control': "no-cache",
}
response = requests.request("POST", url_insert, data=payload, headers=headers)
print(response.text)
res = response.text
response_result = json.loads(res)
#print(response_result["httpStatus"])
if response_result["message"] == 'success':
print ("INFO : Record inserted successfully")
else:
print ("ERROR : Error in the record")
status_code = response_result["status"]
error_message = response_result["error"]
my_list = [(status_code,error_message,row)]
df = sc.createDataFrame(my_list, ['status', 'error', 'json data'])
df.write.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("dbtable", "error_table") \
.option("header", "true") \
.option("truncate_table", "on") \
.mode("append") \
.save()

注意:我知道通过执行“json_insert = df_insert.toJSON().collect()”我失去了数据帧的优势。有没有更好的方法来实现。

最佳答案

df_insert.toJSON() 返回一个可以flatMapRDD 1

source_rdd = df_insert.toJSON()

对此 RDD 执行 flatMap 并返回仅包含错误的 RDD。

headers = {
'Authorization': authorization,
'content-type': "application/json",
'cache-control': "no-cache"
}

def post_service_error(row):
# requests package may not be available in the node
# see about adding files to the spark context
response = requests.request("POST", url_insert, data=row, headers=headers)
response_result = response.json()
if response_result['message'] == 'success':
print ("INFO : Record inserted successfully")
return []
print ("ERROR : Error in the record")
status_code = response_result["status"]
error_message = response_result["error"]
return [(status_code, error_message, row)]

errors_rdd = source_rdd.flatMap(post_service_error)

将错误 RDD 转换为 Spark DataFrame 并将其保存到表中。

errors_df = sc.createDataFrame(errors_rdd, ['status', 'error', 'json data'])
(errors_df.write.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("dbtable", "error_table")
.option("header", "true")
.option("truncate_table", "on")
.mode("append")
.save())

如果您拥有正在执行请求的 API,我建议探索一种接受一批这些对象/数组的实现。这样,您可以在将每个分区映射到批处理请求之前对 RDD 进行分区,并在之后处理错误。

关于python - PySpark/Aws Glue 中的性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54949911/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com