gpt4 book ai didi

azure - Pyspark - 基于数据帧创建包含所有组合的 json 结构

转载 作者:行者123 更新时间:2023-12-03 02:09:48 25 4
gpt4 key购买 nike

我有一个包含 3 列的 pyspark 数据框:

  • databricks路径
  • 国家分区
  • 年份分区

我正在通过数据工厂根据来自小部件的值创建此数据框:/image/8zIuO.png

pyspark 数据框:/image/ZcjZO.png

使用此数据框,我想创建一个包含所有组合的输出,例如,使用此命令将 json 结构作为输出发送到 ADF ( dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition':yearPartition,'countryPartition':countryPartition})) 并能够在 foreach 事件中使用它

输出示例:

"output": {
"value": [
{
"country": "PT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "ES",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "IT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "BE",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_1"
},
{
"country": "PT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
},
{
"country": "ES",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
},
{
"country": "IT",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
},
{
"country": "BE",
"year": "2022",
"databricksPath": "/notebooks/1.Project/Notebook_2"
}
]
}

我正在使用的笔记本:

# Databricks notebook source
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, row_number, instr, expr, when, current_date, months_between, coalesce, concat_ws, sum as Sum, first, round, monotonically_increasing_id, date_format, concat, substring, count
from pyspark.sql.window import Window
from pathlib import Path
from functools import reduce
from pyspark.sql import DataFrame
import traceback
import pyodbc
import uuid
import sys


# COMMAND ----------

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")


databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')

# COMMAND ----------

from pyspark.sql.types import StructType,StructField, StringType
schema = StructType([
StructField('databricksPath', StringType(), True),
StructField('countryPartition', StringType(), True),
StructField('yearPartition', StringType(), True)
])

data2 = [(databricksPath,countryPartition,yearPartition)]
df = spark.createDataFrame(data=data2,schema=schema)

df2 = df.withColumn("databricksPath", concat_ws(",",col("databricksPath")))

display(df2)

# COMMAND ----------

dbutils.notebook.exit({'message': 'Success', 'databricksPath': databricksPath,'yearPartition': yearPartition,'countryPartition': countryPartition})

任何人都可以帮助我实现这一目标

谢谢!

最佳答案

您可以使用以下代码来实现此目的:

dbutils.widgets.text("databricksPath", "['/notebooks/1.Project/Notebook_1','/notebooks/1.Project/Notebook_2']", "databricksPath")
dbutils.widgets.text("countryPartition", "['PT','ES','IT','BE']", "countryPartition")
dbutils.widgets.text("yearPartition", "['2022']", "yearPartition")
#dbutils.widgets.text("partitionColumn", "['dbo.table1|country', 'dbo.table2|country_year']", "partitionColumn")

databricksPath = dbutils.widgets.get('databricksPath')
countryPartition = dbutils.widgets.get('countryPartition')
yearPartition = dbutils.widgets.get('yearPartition')
#partitionColumn = dbutils.widgets.get('partitionColumn')

#creating seperate dataframe for each of the above.
path_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('databricksPath'))],schema=['path'])
cp_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('countryPartition'))],schema=['country'])
y_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('yearPartition'))],schema=['year'])
#p_df = spark.createDataFrame(data=[[i] for i in eval(dbutils.widgets.get('partitionColumn'))],schema=['partition_col'])


#applying cross join to get all combination results.
from pyspark.sql.functions import broadcast
final_df= broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)
#final_df= broadcast(broadcast(broadcast(cp_df).crossJoin(y_df)).crossJoin(path_df)).crossJoin(p_df)

#from pyspark.sql.functions import split
#fdf = final_df.select('country','year','path',split(final_df['partition_col'],'[|]').getItem(0).alias('table'),split(final_df['partition_col'],'[|]').getItem(1).alias('partition'))

#from pyspark.sql.functions import array
#fdf = fdf.withColumn('countryYear', array(col('country'),col('year')))

#get the result dataframe as a dictionary
output = [eval(i) for i in final_df.toJSON().collect()]
#output = [eval(i) for i in fdf.toJSON().collect()]

#returning the above output dictionary/JSON to data factory
import json
dbutils.notebook.exit(json.dumps(output))
  • 使用该代码,output 的值将是对象数组(如输出示例)

enter image description here

  • 当我在 Azure 数据工厂中通过笔记本事件运行此笔记本时,它会给出以下结果:

enter image description here

更新:这是output image了解更新的要求。

关于azure - Pyspark - 基于数据帧创建包含所有组合的 json 结构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73719094/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com