gpt4 book ai didi

scala - 从字符串文字推断 Spark DataType

转载 作者:行者123 更新时间:2023-12-04 04:54:43 25 4
gpt4 key购买 nike

我正在尝试编写一个可以推断 Spark DataTypes 的 Scala 函数基于提供的输入字符串:

/**
* Example:
* ========
* toSparkType("string") => StringType
* toSparkType("boolean") => BooleanType
* toSparkType("date") => DateType
* etc.
*/
def toSparkType(inputType : String) : DataType = {
var dt : DataType = null

if(matchesStringRegex(inputType)) {
dt = StringType
} else if(matchesBooleanRegex(inputType)) {
dt = BooleanType
} else if(matchesDateRegex(inputType)) {
dt = DateType
} else if(...) {
...
}

dt
}

我的目标是支持可用的 DataTypes 中的一个大子集,如果不是全部的话。 .当我开始实现这个功能时,我开始思考:“Spark/Scala 可能已经有一个 helper/util 方法可以为我做这件事。”毕竟,我知道我可以做类似的事情:
var structType = new StructType()

structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)

Scala 和/或 Spark 将隐式转换我的 "string" StringType 的参数等所以我问: 我可以用 Spark 或 Scala 做些什么来帮助我实现转换器方法?

最佳答案

Spark/Scala probably already have a helper/util method that will do this for me.



你是对的。 Spark 已经有自己的模式和数据类型推断代码,用于从底层数据源(csv、json 等)推断模式。所以你可以查看它来实现你自己的(实际的实现标记为 Spark 私有(private),并且是绑定(bind)到 RDD 和内部类,所以它不能直接从 Spark 之外的代码中使用,但应该让您对如何去做有一个好主意。)

鉴于 csv 是平面类型(并且 json 可以具有嵌套结构),csv 模式推断相对更直接,应该可以帮助您完成上述任务。所以我将解释 csv 推理是如何工作的(json 推理只需要考虑可能的嵌套结构,但数据类型推理非常相似)。

有了这个序幕,你想看的是 CSVInferSchema目的。特别是看 infer采用 RDD[Array[String]] 的方法并推断整个 RDD 中数组的每个元素的数据类型。它的做法是——将每个字段标记为 NullType。开始,然后迭代 Array[String] 中的下一行值( RDD )它更新了已经推断的 DataType到一个新的 DataType如果新的 DataType更具体。这正在发生 here :
val rootTypes: Array[DataType] =
tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)

现在 inferRowType calls inferField对于行中的每个字段。 inferField implementation可能是您要查找的内容 - 它采用迄今为止为特定字段推断的类型和当前行的字段的字符串值作为参数。然后它返回现有的推断类型,或者如果推断的新类型比新类型更具体。

相关部分代码如下:
typeSoFar match {
case NullType => tryParseInteger(field, options)
case IntegerType => tryParseInteger(field, options)
case LongType => tryParseLong(field, options)
case _: DecimalType => tryParseDecimal(field, options)
case DoubleType => tryParseDouble(field, options)
case TimestampType => tryParseTimestamp(field, options)
case BooleanType => tryParseBoolean(field, options)
case StringType => StringType
case other: DataType =>
throw new UnsupportedOperationException(s"Unexpected data type $other")
}

请注意,如果 typeSoFar是 NullType 然后它首先尝试将其解析为 Integer但是 tryParseInteger call 是对较低类型解析的调用链。因此,如果它不能将值解析为整数,那么它将调用 tryParseLong失败时将调用 tryParseDecimal失败时将调用 tryParseDouble w.o.f.w.i. tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i.终于 stringType .

因此,您可以使用几乎类似的逻辑来实现您的用例。 (如果您不需要跨行合并,那么您只需逐字实现所有 tryParse* 方法并简单地调用 tryParseInteger 。无需编写自己的正则表达式。)

希望这可以帮助。

关于scala - 从字符串文字推断 Spark DataType,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39777648/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com