- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试使用zipWithIndex
添加一个具有行号的列,如下所示在spark中
val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val dfZippedWithId = spark.createDataFrame(rddzip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
但是我在 JAVA 中尝试做的事情如下
JavaRDD<Row> rdd = (JavaRDD) df.toJavaRDD().zipWithIndex().map(t -> {
Row r = t._1;
Long index = t._2 + 1;
ArrayList<Object> list = new ArrayList<>();
for(Object item: JavaConverters.seqAsJavaListConverter(r.toSeq()).asJava()) {
list.add(item);
}
return RowFactory.create(JavaConverters.seqAsJavaListConverter(t._1.toSeq()).asJava().add(t._2));
});
StructType newSchema = df.schema()
.add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));
return df.sparkSession().createDataFrame(rdd, newSchema);
我收到以下错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (T
ID 523, localhost, executor driver): java.lang.UnsupportedOperationException
at java.util.AbstractList.add(Unknown Source)
at java.util.AbstractList.add(Unknown Source)
at com.nielsen.media.mediaView.adintel.pivot.datareader.AIReportViewerProcessor.lambda$zipWithIndex$7245ab51$1(AIReportViewerProcessor.java:3071)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
有什么帮助吗?
最佳答案
在scala版本中,您传递给spark.createDataFrame
RDD[Row]
,在java中,您传递JavaPairRDD
,您应该映射它到JavaRDD[Row]
。
Dataset<Row> df = ss.range(10).toDF();
df.show();
JavaPairRDD<Row, Long> rddzip = df.toJavaRDD().zipWithIndex();
JavaRDD<Row> rdd = rddzip.map(s->{
Row r = s._1;
Object[] arr = new Object[r.size()+1];
for (int i = 0; i < arr.length-1; i++) {
arr[i] = r.get(i);
}
arr[arr.length-1] = s._2;
return RowFactory.create(arr);
});
StructType newSchema = df.schema().add(new StructField("rowid",
DataTypes.LongType, false, Metadata.empty()));
Dataset<Row> df2 = ss.createDataFrame(rdd,newSchema);
df2.show();
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
+---+-----+
| id|rowid|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
| 4| 4|
| 5| 5|
| 6| 6|
| 7| 7|
| 8| 8|
| 9| 9|
+---+-----+
关于java - 在 java Spark 中尝试 zipWithIndex 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58799176/
下面两段代码如何等效? (案例如何运作) list.zipWithIndex.flatMap{ rowAndIndex => rowAndIndex._1.zipWithInde
下面两段代码如何等效? (案例如何运作) list.zipWithIndex.flatMap{ rowAndIndex => rowAndIndex._1.zipWithInde
如果我有一个文件,并且我每行做了一个 RDD zipWithIndex, ([row1, id1001, name, address], 0) ([row2, id1001, name, addres
我正在尝试解决向数据集添加序列号的古老问题。我正在使用 DataFrame,并且似乎没有与 RDD.zipWithIndex 等效的 DataFrame。另一方面,以下内容或多或少按照我想要的方式工作
我有一个需要根据索引分组的列表。分组是用第 i 个项目和第 (i+6) 个项目完成的(列表大小是 6 的倍数)。 val list = List("a" ,"b" ,"c" ,"d" ,"e" ,"f
有一些嵌套的集合: val xs = List( List("a","b"), List("c"), List("d", "e", "
在 HList 上写算法, 我需要一个 zipWithIndex功能。它现在不在无形库中,所以我决定实现它。 很明显,它可以实现为 hlist.zip(indexes) 哪里indexes是 HLis
我想为我的输入的每一行分配一个 id - 这应该是来自 0 的数字至 N - 1 ,其中 N是输入中的行数。 粗略地说,我希望能够执行以下操作: val data = sc.textFile(text
假设我有以下数据框: dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)] df = sc.parallelize(dummy_data).t
说如果我这样做,如下所示。 val rdd = sc.textFile("someFile.txt") val rddWithLines = rdd.zipWithIndex zipWithIndex
如果我有一个列表和 zipWithIndex val list = List('a', 'b', 'c') val ziplist = List.zipWithIndex // List[(int,
本质上,当 zipWithIndex 应用于数组时,它应该生成另一个数组,其中键作为值,值作为数组元素(反之亦然)。 最佳答案 更新 根据 OP 的评论,返回值应该是一个对象数组,每个对象都包含一个属
我尝试使用zipWithIndex添加一个具有行号的列,如下所示在spark中 val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0,
背景 如 this question 中所述,我正在使用 Scalaz 7 iteratees 来处理常量堆空间中的大型(即无界)数据流。 我的代码如下所示: type ErrorOrT[M[+_],
我是一名优秀的程序员,十分优秀!