gpt4 book ai didi

csv - 如何在数据框中指定缺失值

转载 作者:行者123 更新时间:2023-12-04 15:10:05 26 4
gpt4 key购买 nike

我正在尝试使用 Apache Zeppelin 笔记本将 CSV 文件加载到带有 spark-csv [1] 的 Spark 数据帧中,并且在加载没有值的数字字段时,解析器对该行失败并且该行被跳过。

我本来希望加载该行,并且数据框中的值加载该行并将值设置为 NULL,以便聚合忽略该值。

%dep
z.reset()
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>")
z.load("com.databricks:spark-csv_2.10:1.1.0")


%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", DoubleType, true) ::
Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.load("file:///home/spark_user/data.csv")

df.describe("height").show()

这是数据文件的内容:/home/spark_user/data.csv
identifier,name,height
1,sam,184
2,cath,180
3,santa, <-- note that there is not height recorded for Santa !

这是输出:
+-------+------+
|summary|height|
+-------+------+
| count| 2| <- 2 of 3 lines loaded, ie. sam and cath
| mean| 182.0|
| stddev| 2.0|
| min| 180.0|
| max| 184.0|
+-------+------+

在 zeppelin 的日志中,我可以在解析圣诞老人的行时看到以下错误:
ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,.
java.lang.NumberFormatException: empty String
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198)
at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

所以你可能会告诉我到目前为止很好......而且你是对的;)

现在我想添加一个额外的列,比如年龄,我总是在该字段中有数据。
identifier,name,height,age
1,sam,184,30
2,cath,180,32
3,santa,,70

现在礼貌地询问一些关于年龄的统计数据:
%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", DoubleType, true) ::
StructField("age", DoubleType, true) ::
Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.load("file:///home/spark_user/data2.csv")

df.describe("age").show()

结果
+-------+----+
|summary| age|
+-------+----+
| count| 2|
| mean|31.0|
| stddev| 1.0|
| min|30.0|
| max|32.0|
+-------+----+

全错了!由于圣诞老人的高度未知,整条线都丢失了,年龄的计算仅基于 Sam 和 Cath,而圣诞老人的年龄完全有效。

我的问题是我需要什么值来插入圣诞老人的高度才能加载 CSV。我试图将架构设置为所有 StringType 但后来

下一个问题是更多关于

我在 API 中发现可以使用 spark 处理 N/A 值。所以我想也许我可以加载所有列设置为 StringType 的数据,然后做一些清理,然后只正确设置架构,如下所示:
%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", StringType, true) ::
StructField("age", StringType, true) ::
Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv")

// eg. for each column of my dataframe, replace empty string by null
df.na.replace( "*", Map("" -> null) )

val toDouble = udf[Double, String]( _.toDouble)
df2 = df.withColumn("age", toDouble(df("age")))

df2.describe("age").show()

但是 df.na.replace() 抛出异常并停止:
java.lang.IllegalArgumentException: Unsupported value type java.lang.String ().
at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417)
at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337)
at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304)

非常感谢任何帮助和提示!

[1] https://github.com/databricks/spark-csv

最佳答案

Spark-csv 缺少此选项。它has been fixed在主分支。我想您应该使用它或等待下一个稳定版本。

关于csv - 如何在数据框中指定缺失值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31542970/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com