gpt4 book ai didi

scala - 引用与 SparkSQL CSV 不明确

转载 作者:行者123 更新时间:2023-12-01 13:38:59 25 4
gpt4 key购买 nike

我正在尝试使用自定义模式读取 SparkSQL 2.10 中的一堆 CSV 文件,该模式部分为 double ,部分为字符串,如下所示:

// Build the schema
val schemaStringS = "col1 col2"
val schemaStringD = "col3 col4 col5 col6"
val schemaStringS2 = "col7 col8"
val fieldsString = schemaStringS.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val fieldsString2 = schemaStringS2.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val fieldsDouble = schemaStringS.split(" ")
.map(fieldName => StructField(fieldName, DoubleType, nullable = true))
val schema = StructType(fieldsString ++ fieldsDouble ++ fieldsString2)

// Read DataFrame
val input = sqlContext.read.schema(schema)
.option("header", true)
.csv("/files/*.csv")
.toJavaRDD

这导致

Exception in thread "main" org.apache.spark.sql.AnalysisException: Reference 'col6' is ambiguous, could be: col6#0, col6#5.;
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:158)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:129)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at org.apache.spark.sql.types.StructType.map(StructType.scala:96)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:129)
at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:83)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:62)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2547)
at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.toJavaRDD(Dataset.scala:2557)
at com.otterinasuit.spark.sensorlog.main.Main$.main(Main.scala:39)
at com.otterinasuit.spark.sensorlog.main.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

我尝试使用 cat 合并文件(仅适用于 PoC)并避免使用 CSV 库(认为这可能是新 Spark 版本中的错误)但无济于事。

val input = sc.textFile("/csv/*.csv")
.map(line => line.split(",")).filter(row => !row.contains("col1")).map(x => Row(x))
val input2 = sqlContext.createDataFrame(input, schema)

我在使用常规 DataFrame 和连接时遇到过这个问题,iirc 这可以通过指定列名、删除特定列或使用不同的连接来解决。但是,在这种情况下,我没有那个选项。

所有文件中的所有 header 都是相同的,如 head -1 *.csv 所证明的。我不明白为什么会这样。

最佳答案

fieldsStringfieldsDouble 都引用了 schemaStringS

val fieldsString = schemaStringS.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))

//Changing from schemaStringS to schemaStringD
val fieldsDouble = schemaStringD.split(" ")
.map(fieldName => StructField(fieldName, DoubleType, nullable = true))

所以,当你合并的时候

val schema = StructType(fieldsString ++ fieldsDouble ++ fieldsString2))

它抛出 'col6' is ambiguous 错误,

关于scala - 引用与 SparkSQL CSV 不明确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41905709/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com