gpt4 book ai didi

scala - 使用 Pipeline 基于分区的 DataFrame 创建许多 Spark MLlib 模型

转载 作者:行者123 更新时间:2023-12-03 22:16:55 25 4
gpt4 key购买 nike

scala> spark.version
res8: String = 2.2.0

我正在使用包含列 locationID 的 spark 数据框.我创建了一个 MLlib 管道来构建线性回归模型,当我为单个 locationID 提供数据时,它可以工作。 .我现在想为每个“locationID”创建许多模型(生产中可能有几千个 locationID)。我想保存每个模型的模型系数。

我不确定如何在 Scala 中做到这一点。

我的管道定义如下:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql


// Load the regression input data
val mydata = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./inputdata.csv")

// Crate month one hot encoding
val monthIndexer = new StringIndexer()
.setInputCol("month")
.setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
.setInputCol(monthIndexer.getOutputCol)
.setOutputCol("monthVec")
val assembler = new VectorAssembler()
.setInputCols(Array("monthVec","tran_adr"))
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val pipeline = new Pipeline()
.setStages(Array(monthIndexer, monthEncoder, assembler, lr))


// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)

然后我可以像这样提取模型细节:
val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]

println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

现在我想对列进行分组 locationIDmydata 中找到并在数据的每个分区上运行管道。

我试过使用 groupby 但我只能聚合。
val grouped = mydata.groupBy("locationID")

我也试过拉独特的 locationID作为列表并循环遍历它:
val locationList = mydata.select(mydata("prop_code")).distinct

locationList.foreach { printLn }

我知道 spark 不适合创建许多较小的模型,而最适合在大量数据上创建一个模型,但我的任务是将其作为概念证明。

在 Spark 中做这样的事情的正确方法是什么?

最佳答案

What is the correct approach for doing something like this in spark?



我会冒着声称根本没有好的方法的风险。有许多可以处理核心数据处理的高级工具和许多可用于编排独立学习任务的任务调度库。 Spark 根本不提供任何东西。

它的调度能力很一般,ML/MLlib 工具也很一般,当每个任务都是独立的时候,扩展和容错也没有用。

您可以使用 Spark 进行通用调度(如果您不介意使用 Python,这个想法是通过 sklearn keyed models 实现的),仅此而已。

关于scala - 使用 Pipeline 基于分区的 DataFrame 创建许多 Spark MLlib 模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49803989/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com