gpt4 book ai didi

scala - 如何使用 spark scala 以附加模式在单个 JSON 文件下保存多个查询的输出

转载 作者:行者123 更新时间:2023-12-02 18:56:35 26 4
gpt4 key购买 nike

我有 5 个查询,如下所示:

select * from table1  
select * from table2
select * from table3
select * from table4
select * from table5
现在,我想要的是我必须以顺序方式执行这些查询,然后继续将输出存储在 single JSON 中。 appended 中的文件模式。我编写了以下代码,但它将每个查询的输出存储在 different part files 中而不是一个。
下面是我的代码:
def store(jobEntity: JobDetails, jobRunId: Int): Unit = {
UDFUtil.registerUdfFunctions()
var outputTableName: String = null
val jobQueryMap = jobEntity.jobQueryList.map(jobQuery => (jobQuery.sequenceId, jobQuery))
val sortedQueries = scala.collection.immutable.TreeMap(jobQueryMap.toSeq: _*).toMap
LOGGER.debug("sortedQueries ===>" + sortedQueries)
try {
outputTableName = jobEntity.destinationEntity
var resultDF: DataFrame = null
sortedQueries.values.foreach(jobQuery => {
LOGGER.debug(s"jobQuery.query ===> ${jobQuery.query}")
resultDF = SparkSession.builder.getOrCreate.sqlContext.sql(jobQuery.query)

if (jobQuery.partitionColumn != null && !jobQuery.partitionColumn.trim.isEmpty) {
resultDF = resultDF.repartition(jobQuery.partitionColumn.split(",").map(col): _*)
}
if (jobQuery.isKeepInMemory) {
resultDF = resultDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
}
if (jobQuery.isCheckpointEnabled) {
val checkpointDir = ApplicationConfig.getAppConfig(JobConstants.CHECKPOINT_DIR)
val fs = FileSystem.get(new Storage(JsonUtil.toMap[String](jobEntity.sourceConnection)).asHadoopConfig())
val path = new Path(checkpointDir)
if (!fs.exists(path)) {
fs.mkdirs(path)
}
resultDF.explain(true)
SparkSession.builder.getOrCreate.sparkContext.setCheckpointDir(checkpointDir)
resultDF = resultDF.checkpoint
}
resultDF = {
if (jobQuery.isBroadCast) {
import org.apache.spark.sql.functions.broadcast
broadcast(resultDF)
} else
resultDF
}
tempViewsList.+=(jobQuery.queryAliasName)
resultDF.createOrReplaceTempView(jobQuery.queryAliasName)
// resultDF.explain(true)
val map: Map[String, String] = JsonUtil.toMap[String](jobEntity.sinkConnection)
LOGGER.debug("sink details :: " + map)
if (resultDF != null && !resultDF.take(1).isEmpty) {
resultDF.show(false)
val sinkDetails = new Storage(JsonUtil.toMap[String](jobEntity.sinkConnection))
val path = sinkDetails.basePath + File.separator + jobEntity.destinationEntity
println("path::: " + path)
resultDF.repartition(1).write.mode(SaveMode.Append).json(path)
}
}
)
忽略我在这种方法中以及阅读和写作中所做的其他事情( CheckpointingLoggingAuditing)。

最佳答案

使用以下示例作为您的问题的引用。
我有三个表 Json数据( 具有不同的架构 )如下:

  • table1 --> 个人资料表
  • table2 --> 公司资料表
  • table3 --> 薪资数据表

  • 我正在根据您的要求以顺序模式一一阅读这三个表,并在 List TableColList 的帮助下对数据进行少量转换(​​爆炸 Json 数组列)其中包含与表对应的数组列名称,使用分号 (":") 分隔符。 OutDFList是所有转换后的 DataFrame 的列表。
    最后,我从 OutDFList 中减少所有数据帧到单个数据帧中并将其写入一个 JSON文件。

    Note: I have used join to reduced all DataFrames, You can also useunion(if have same columns) or else as per requirement.


    检查以下代码:
    scala> spark.sql("select * from table1").printSchema
    root
    |-- Personal: array (nullable = true)
    | |-- element: struct (containsNull = true)
    | | |-- DOB: string (nullable = true)
    | | |-- EmpID: string (nullable = true)
    | | |-- Name: string (nullable = true)


    scala> spark.sql("select * from table2").printSchema
    root
    |-- Company: array (nullable = true)
    | |-- element: struct (containsNull = true)
    | | |-- EmpID: string (nullable = true)
    | | |-- JoinDate: string (nullable = true)
    | | |-- Project: string (nullable = true)


    scala> spark.sql("select * from table3").printSchema
    root
    |-- Salary: array (nullable = true)
    | |-- element: struct (containsNull = true)
    | | |-- EmpID: string (nullable = true)
    | | |-- Monthly: string (nullable = true)
    | | |-- Yearly: string (nullable = true)

    scala> val TableColList = List("table1:Personal", "table2:Company", "table3:Salary")
    TableColList: List[String] = List(table1:Personal, table2:Company, table3:Salary)


    scala> val OutDFList = TableColList.map{ X =>
    | val table = X.split(":")(0)
    | val arrayColumn = X.split(":")(1)
    | val df = spark.sql(s"""SELECT * FROM """ + table).select(explode(col(arrayColumn)) as "data").select("data.*")
    | df}
    OutDFList: List[org.apache.spark.sql.DataFrame] = List([DOB: string, EmpID: string ... 1 more field], [EmpID: string, JoinDate: string ... 1 more field], [EmpID: string, Monthly: string ... 1 more field])

    scala> val FinalOutDF = OutDFList.reduce((df1, df2) => df1.join(df2, "EmpID"))
    FinalOutDF: org.apache.spark.sql.DataFrame = [EmpID: string, DOB: string ... 5 more fields]

    scala> FinalOutDF.printSchema
    root
    |-- EmpID: string (nullable = true)
    |-- DOB: string (nullable = true)
    |-- Name: string (nullable = true)
    |-- JoinDate: string (nullable = true)
    |-- Project: string (nullable = true)
    |-- Monthly: string (nullable = true)
    |-- Yearly: string (nullable = true)


    scala> FinalOutDF.write.json("/FinalJsonOut")

    关于scala - 如何使用 spark scala 以附加模式在单个 JSON 文件下保存多个查询的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62638534/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com