gpt4 book ai didi

scala - 如何从 UDF 创建自定义 Transformer?

转载 作者:行者123 更新时间:2023-12-04 03:14:58 35 4
gpt4 key购买 nike

我试图创建和保存 Pipeline带有自定义阶段。我需要添加一个 column给我的DataFrame通过使用 UDF .因此,我想知道是否可以转换 UDF或类似的操作进入 Transformer ?

我的定制UDF看起来像这样,我想学习如何使用 UDF作为定制Transformer .

def getFeatures(n: String) = {
val NUMBER_FEATURES = 4
val name = n.split(" +")(0).toLowerCase
((1 to NUMBER_FEATURES)
.filter(size => size <= name.length)
.map(size => name.substring(name.length - size)))
}

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))

最佳答案

它不是一个功能齐全的解决方案,但您可以从以下内容开始:

import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

class NGramTokenizer(override val uid: String)
extends UnaryTransformer[String, Seq[String], NGramTokenizer] {

def this() = this(Identifiable.randomUID("ngramtokenizer"))

override protected def createTransformFunc: String => Seq[String] = {
getFeatures _
}

override protected def validateInputType(inputType: DataType): Unit = {
require(inputType == StringType)
}

override protected def outputDataType: DataType = {
new ArrayType(StringType, true)
}
}

快速检查:
val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")

transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+

您甚至可以尝试将其概括为以下内容:
import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._

class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
override val uid: String,
f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]] {

override protected def createTransformFunc: T => U = f

override protected def validateInputType(inputType: DataType): Unit =
require(inputType == schemaFor[T].dataType)

override protected def outputDataType: DataType = schemaFor[U].dataType
}

val transformer = new UnaryUDFTransformer("featurize", getFeatures)
.setInputCol("v")
.setOutputCol("vs")

如果您想使用 UDF 而不是包装函数,则必须扩展 Transformer直接覆盖 transform方法。不幸的是,大多数有用的类都是私有(private)的,所以它可能相当棘手。

或者,您可以注册 UDF:
spark.udf.register("getFeatures", getFeatures _)

并使用 SQLTransformer
import org.apache.spark.ml.feature.SQLTransformer

val transformer = new SQLTransformer()
.setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")

transformer.transform(df).show
// +---+------+------------------+
// | k| v| vs|
// +---+------+------------------+
// | 1|abcdef|[f, ef, def, cdef]|
// | 2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+

关于scala - 如何从 UDF 创建自定义 Transformer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35180527/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com