gpt4 book ai didi

python - 从任务中调用 Java/Scala 函数

转载 作者:IT老高 更新时间:2023-10-28 21:47:46 30 4
gpt4 key购买 nike

背景

我最初的问题是 为什么在 map 函数中使用 DecisionTreeModel.predict 会引发异常? 并且与 How to generate tuples of (original lable, predicted label) on Spark with MLlib? 相关

当我们使用 Scala API a recommended way使用 DecisionTreeModelRDD[LabeledPoint] 进行预测的方法是简单地映射到 RDD:

val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}

不幸的是,PySpark 中的类似方法效果不佳:

labelsAndPredictions = testData.map(
lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

而不是 official documentation推荐这样的东西:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

那么这里发生了什么?这里没有广播变量和Scala API predict 定义如下:

/**
* Predict values for a single data point using the model trained.
*
* @param features array representing a single data point
* @return Double prediction from the trained model
*/
def predict(features: Vector): Double = {
topNode.predict(features)
}

/**
* Predict values for the given data set using the model trained.
*
* @param features RDD representing data points to be predicted
* @return RDD of predictions for each of the given data points
*/
def predict(features: RDD[Vector]): RDD[Double] = {
features.map(x => predict(x))
}

所以至少乍一看,从 Action 或转换调用不是问题,因为预测似乎是一种本地操作。

说明

经过一番挖掘,我发现问题的根源是 JavaModelWrapper.callDecisionTreeModel.predict 调用的方法.它access SparkContext是调用Java函数所需要的:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

问题

DecisionTreeModel.predict 的情况下,有一个推荐的解决方法,并且所有必需的代码已经是 Scala API 的一部分,但是一般来说有什么优雅的方法来处理这样的问题吗?

目前只有我能想到的比较重量级的解决方案:

  • 通过隐式转换扩展 Spark 类或添加某种包装器,将所有内容推送到 JVM
  • 直接使用 Py4j 网关

最佳答案

使用默认 Py4J 网关进行通信是不可能的。要了解为什么我们必须查看 PySpark 内部文档 [1] 中的下图:

enter image description here

由于 Py4J 网关在驱动程序上运行,因此通过套接字与 JVM 工作人员通信的 Python 解释器无法访问它(参见示例 PythonRDD/rdd.py )。

理论上可以为每个工作人员创建一个单独的 Py4J 网关,但实际上它不太可能有用。忽略可靠性等问题 Py4J 根本不是为执行数据密集型任务而设计的。

有什么解决方法吗?

  1. 使用 Spark SQL Data Sources API包装 JVM 代码。

    优点:受支持,高级别的,不需要访问内部 PySpark API

    缺点:相对冗长且没有很好的文档记录,主要限于输入数据

  2. 使用 Scala UDF 对 DataFrame 进行操作。

    优点:易于实现(参见 Spark: How to map Python with Scala or Java User Defined Functions?),如果数据已存储在 DataFrame 中,则无需在 Python 和 Scala 之间进行数据转换,对 Py4J 的访问最少

    缺点:需要访问 Py4J 网关和内部方法,仅限于 Spark SQL,难以调试,不支持

  3. 以类似于在 MLlib 中完成的方式创建高级 Scala 接口(interface)。

    优点:灵活,能够执行任意复杂代码。它可以直接在 RDD 上使用(例如见 MLlib model wrappers )或与 DataFrames 一起使用(见 How to use a Scala class inside Pyspark )。后一种解决方案似乎更友好,因为所有 ser-de 细节都已由现有 API 处理。

    缺点:低级,需要数据转换,和UDF一样需要访问Py4J和内部API,不支持

    一些基本的例子可以在 Transforming PySpark RDD with Scala 中找到。

  4. 使用外部工作流管理工具在 Python 和 Scala/Java 作业之间切换并将数据传递到 DFS。

    优点:易于实现,对代码本身的更改最少

    缺点:读取/写入数据的成本(Alluxio?)

  5. 使用共享的 SQLContext(参见例如 Apache ZeppelinLivy )使用已注册的临时表在 guest 语言之间传递数据。

    优点:非常适合交互式分析

    缺点:对于批处理作业(Zeppelin)来说不是很多,或者可能需要额外的编排(Livy)


  1. 约书亚·罗森。 (2014 年 8 月 4 日)PySpark Internals .取自 https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

关于python - 从任务中调用 Java/Scala 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31684842/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com