gpt4 book ai didi

scala - 如何将 csv 直接加载到 Spark 数据集中?

转载 作者:行者123 更新时间:2023-12-04 23:39:20 25 4
gpt4 key购买 nike

我有一个 csv 文件 [1],我想直接将其加载到数据集中。问题是我总是收到类似的错误

org.apache.spark.sql.AnalysisException: Cannot up cast `probability` from string to float as it may truncate
The type path of the target object is:
- field (class: "scala.Float", name: "probability")
- root class: "TFPredictionFormat"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object;

而且,专门针对 phrases字段(检查案例类 [2])它得到
org.apache.spark.sql.AnalysisException: cannot resolve '`phrases`' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true);

如果我将案例类 [2] 中的所有字段定义为 String 类型,那么一切正常,但这不是我想要的。有没有简单的方法来做到这一点[3]?

引用文献

[1] 示例行
B017NX63A2,Merrell,"['merrell_for_men', 'merrell_mens_shoes', 'merrel']",merrell_shoes,0.0806054356579781

[2]我的代码片段如下
import spark.implicits._

val INPUT_TF = "<SOME_URI>/my_file.csv"

final case class TFFormat (
doc_id: String,
brand: String,
phrases: Seq[String],
prediction: String,
probability: Float
)

val ds = sqlContext.read
.option("header", "true")
.option("charset", "UTF8")
.csv(INPUT_TF)
.as[TFFormat]

ds.take(1).map(println)

[3] 我找到了通过首先在 DataFrame 级别上定义列并将事物转换为 Dataset(例如 hereherehere )来找到方法,但我几乎可以肯定这不是应该的方式要做。我也很确定编码器可能是答案,但我不知道如何

最佳答案

TL;博士 csv输入转换标准 DataFrame运营才是王道。如果你想避免,你应该使用具有表现力的输入格式(Parquet 甚至 JSON)。

一般来说,要转换为静态类型数据集的数据必须已经是正确的类型。最有效的方法是提供 schema csv 的论据读者:

val schema: StructType = ???
val ds = spark.read
.option("header", "true")
.schema(schema)
.csv(path)
.as[T]

哪里 schema可以通过反射推断:
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType

val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

不幸的是,它不适用于您的数据和类,因为 csv阅读器不支持 ArrayType (但它适用于像 FloatType 这样的原子类型)所以你必须使用困难的方式。一个简单的解决方案可以表示如下:
import org.apache.spark.sql.functions._

val df: DataFrame = ??? // Raw data

df
.withColumn("probability", $"probability".cast("float"))
.withColumn("phrases",
split(regexp_replace($"phrases", "[\\['\\]]", ""), ","))
.as[TFFormat]

但根据 phrases 的内容,您可能需要更复杂的东西。 .

关于scala - 如何将 csv 直接加载到 Spark 数据集中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42678563/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com