gpt4 book ai didi

java - 使用 Java API 和 JavaRDD 在 Spark SQL 中向数据帧添加新列

转载 作者:行者123 更新时间:2023-12-02 13:19:22 25 4
gpt4 key购买 nike

我尝试在应用 mapPartition 函数后创建一个新的数据框(在 SparkSQL 1.6.2 中),如下所示:

FlatMapFunction<Iterator<Row>,Row> mapPartitonstoTTF=rows->
{
List<Row> mappedRows=new ArrayList<Row>();
while(rows.hasNext())
{
Row row=rows.next();
Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5),
row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),0L);
mappedRows.add(mappedRow);

}
return mappedRows;

};


JavaRDD<Row> sensorDataDoubleRDD=oldsensorDataDoubleDF.toJavaRDD().mapPartitions(mapPartitonstoTTF);

StructType oldSchema=oldsensorDataDoubleDF.schema();
StructType newSchema =oldSchema.add("TTF",DataTypes.LongType,false);

System.out.println("The new schema is: ");
newSchema.printTreeString();

System.out.println("The old schema is: ");
oldSchema.printTreeString();

DataFrame sensorDataDoubleDF=hc.createDataFrame(sensorDataDoubleRDD, newSchema);
sensorDataDoubleDF.show();

从上面可以看出,我使用 RowFactory.create() 函数向 RDD 添加一个值为 0 的新 LongType 列

但是,我在运行sensorDataDoubleDF.show();的线路时遇到异常如下:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 117 in stage 26.0 failed 4 times, most recent failure: Lost task 117.3 in stage 26.0 (TID 3249, AUPER01-01-20-08-0.prod.vroc.com.au): scala.MatchError: 1435766400001 (of class java.lang.Long)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

旧架构是

root
|-- data_quality: double (nullable = false)
|-- data_sensor: string (nullable = true)
|-- data_timestamp: long (nullable = false)
|-- data_valueDouble: double (nullable = false)
|-- day: integer (nullable = false)
|-- dpnode: string (nullable = true)
|-- dsnode: string (nullable = true)
|-- month: integer (nullable = false)
|-- year: integer (nullable = false)
|-- nodeid: string (nullable = true)
|-- nodename: string (nullable = true)

新架构类似于上面,添加了一个 TTF 列作为 LongType

root
|-- data_quality: double (nullable = false)
|-- data_sensor: string (nullable = true)
|-- data_timestamp: long (nullable = false)
|-- data_valueDouble: double (nullable = false)
|-- day: integer (nullable = false)
|-- dpnode: string (nullable = true)
|-- dsnode: string (nullable = true)
|-- month: integer (nullable = false)
|-- year: integer (nullable = false)
|-- nodeid: string (nullable = true)
|-- nodename: string (nullable = true)
|-- TTF: long (nullable = false)

感谢您帮助我找出我犯错误的地方。

最佳答案

旧架构中有 11 列,但只映射 10 列。在 RowFactory.create 函数中添加 row.getString(10)

Row mappedRow= RowFactory.create(row.getDouble(0),row.getString(1),row.getLong(2),row.getDouble(3),row.getInt(4),row.getString(5),
row.getString(6),row.getInt(7),row.getInt(8),row.getString(9),row.getString(10),0L);

关于java - 使用 Java API 和 JavaRDD<Row> 在 Spark SQL 中向数据帧添加新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43628723/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com