gpt4 book ai didi

scala - 创建一个映射来为 Spark Dataframe 的每一行调用 POJO

转载 作者:行者123 更新时间:2023-12-05 04:09:01 28 4
gpt4 key购买 nike

我在 R 中构建了一个 H2O 模型并保存了 POJO 代码。我想使用 POJO 在 hdfs 中对 Parquet 文件进行评分,但我不确定如何去做。我计划将 Parquet 文件读入 spark (scala/SparkR/PySpark) 并在那里对它们进行评分。以下是我在 H2O's documentation page. 上找到的摘录

"How do I run a POJO on a Spark Cluster?

The POJO provides just the math logic to do predictions, so you won’t find any Spark (or even H2O) specific code there. If you want to use the POJO to make predictions on a dataset in Spark, create a map to call the POJO for each row and save the result to a new column, row-by-row"

有没有人有我如何做到这一点的一些示例代码?我将不胜感激任何帮助。我主要使用 R 和 SparkR 编写代码,但不确定如何将 POJO“映射”到每一行。

提前致谢。

最佳答案

我刚刚发布了一个solution实际使用 DataFrame/Dataset。该帖子使用 Star Wars 数据集在 R 中构建模型,然后在 Spark 中的测试集上对 MOJO 进行评分。我将在此处粘贴唯一相关的部分:

使用 Spark(和 Scala)评分

您可以使用 spark-submit 或 spark-shell。如果使用 spark-submit,h2o-genmodel.jar 需要放在 spark 应用根目录下的 lib 文件夹下,以便在编译时添加为依赖项。以下代码假定您正在运行 spark-shell。为了使用 h2o-genmodel.jar,您需要在启动 spark-shell 时通过提供 --jar 标志附加 jar 文件。例如:

/usr/lib/spark/bin/spark-shell \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.driver.memory="3g" \
--conf spark.executor.memory="10g" \
--conf spark.executor.instances=10 \
--conf spark.executor.cores=4 \
--jars /path/to/h2o-genmodel.jar

现在在 Spark shell 中,导入依赖项

import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.MojoModel

使用数据框

val modelPath = "/path/to/zip/file"
val dataPath = "/path/to/test/data"

// Import data
val dfStarWars = spark.read.option("header", "true").csv(dataPath)
// Import MOJO model
val mojo = MojoModel.load(modelPath)
val easyModel = new EasyPredictModelWrapper(mojo)

// score
val dfScore = dfStarWars.map {
x =>
val r = new RowData
r.put("height", x.getAs[String](1))
r.put("mass", x.getAs[String](2))
val score = easyModel.predictBinomial(r).classProbabilities
(x.getAs[String](0), score(1))
}.toDF("name", "isHumanScore")

变量 score 是级别 0 和 1 的两个分数的列表。score(1) 是级别 1 的分数,即“人类”。默认情况下,map 函数返回具有未指定列名“_1”、“_2”等的 DataFrame。您可以通过调用 toDF 重命名列。

使用数据集

要使用数据集 API,我们只需创建两个案例类,一个用于输入数据,一个用于输出。

case class StarWars (
name: String,
height: String,
mass: String,
is_human: String
)

case class Score (
name: String,
isHumanScore: Double
)


// Dataset
val dtStarWars = dfStarWars.as[StarWars]
val dtScore = dtStarWars.map {
x =>
val r = new RowData
r.put("height", x.height)
r.put("mass", x.mass)
val score = easyModel.predictBinomial(r).classProbabilities
Score(x.name, score(1))
}

使用数据集,您可以通过直接调用 x.columnName 来获取列的值。请注意,列值的类型必须是字符串,因此如果它们是案例类中定义的其他类型,您可能需要手动转换它们。

关于scala - 创建一个映射来为 Spark Dataframe 的每一行调用 POJO,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46849368/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com