gpt4 book ai didi

scala - 迭代数据框中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询

转载 作者:行者123 更新时间:2023-12-01 10:18:59 28 4
gpt4 key购买 nike

我正在尝试从查找表(3 行和 3 列)中获取行并逐行迭代并将每行中的值作为参数传递给 SPARK SQL。

DB | TBL   | COL
----------------
db | txn | ID

db | sales | ID

db | fee | ID

我在 spark shell 中试了一排,成功了。但我发现很难遍历行。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val db_name:String = "db"

val tbl_name:String = "transaction"

val unique_col:String = "transaction_number"

val dupDf = sqlContext.sql(s"select count(*), transaction_number from $db_name.$tbl_name group by $unique_col having count(*)>1")

请告诉我如何遍历行并将其作为参数传递?

最佳答案

Above 2 approaches are may be right in general.. but some how I dont like collecting the data because of performance reasons... specially if data is huge...

org.apache.spark.util.CollectionAccumulator is right candidate for this kind of requirements... see docs

另外,如果数据很大,那么 foreachPartition出于性能原因再次成为此人的合适人选!

下面是实现

package examples

import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.CollectionAccumulator

import scala.collection.JavaConversions._
import scala.collection.mutable

object TableTest extends App {
val logger = org.apache.log4j.Logger.getLogger("org")
logger.setLevel(Level.WARN)


val spark = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate

import spark.implicits._

val lookup =
Seq(("db", "txn", "ID"), ("db", "sales", "ID")
, ("db", "fee", "ID")
).toDF("DB", "TBL", "COL")
val collAcc: CollectionAccumulator[String] = spark.sparkContext.collectionAccumulator[String]("mySQL Accumulator")
val data = lookup.foreachPartition { partition =>
partition.foreach {
{
record => {
val selectString = s"select count(*), transaction_number from ${record.getAs[String]("DB")}.${record.getAs[String]("TBL")} group by ${record.getAs[String]("COL")} having count(*)>1";
collAcc.add(selectString)
println(selectString)
}
}
}
}
val mycollectionOfSelects: mutable.Seq[String] = asScalaBuffer(collAcc.value)
val finaldf = mycollectionOfSelects.map { x => spark.sql(x)
}.reduce(_ union _)
finaldf.show

}

示例结果:

[2019-08-13 12:11:16,458] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader:62)
[Stage 0:> (0 + 0) / 2]

select count(*), transaction_number from db.txn group by ID having count(*)>1

select count(*), transaction_number from db.sales group by ID having count(*)>1

select count(*), transaction_number from db.fee group by ID having count(*)>1


注意:因为这些是伪表,所以我没有显示数据框。

关于scala - 迭代数据框中的每一行,将其存储在 val 中并作为参数传递给 Spark SQL 查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57466411/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com