- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我是Spark和Scala的新手。我正在尝试将函数称为Spark UDF,但遇到了我似乎无法解决的错误。
我了解在Scala中,Array和Seq并不相同。 WrappedArray是Seq的子类型,并且WrappedArray和Array之间存在隐式转换,但是我不确定为什么在UDF中不会发生这种情况。
非常感谢您能帮助我理解和解决此问题的任何指示。
这是代码片段
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))
case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF
mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
case class myType2(id: Int, a: Array[Int])
val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) )
val idDF = idRDD.toDF
idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>]
root
|-- id: integer (nullable = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
import sqlContext.implicits._
/* Hive context is exposed as sqlContext */
val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id"))
val k = j.withColumn("filteredMap",myUDF(j("m"), j("a")))
k.show
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
最佳答案
在函数filterMapKeysWithSet中将数据类型从Array [Int]更改为Seq [Int]似乎可以解决上述问题。
def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a))
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
+---+--------------------+----------------+--------------------+
| id| m| a| filteredMap|
+---+--------------------+----------------+--------------------+
| 1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...|
| 2|Map(1 -> 100, 2 -...| [100, 200]| Map()|
| 3|Map(3 -> 300, 4 -...| [1, 2]| Map()|
+---+--------------------+----------------+--------------------+
关于apache-spark - 无法将scala.collection.mutable.WrappedArray $ ofRef强制转换为Integer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40199507/
我有一个类型为set的列,我使用spark Dataset API的collect_set(),它返回一个包装数组的包装数组。我想要来自嵌套包装数组的所有值的单个数组。我怎样才能做到这一点? 例如。
我的 Dataset 中有一列包含 WrappedArray> .我将此专栏传递给 UDF提取其中一个值。 我将如何访问这个嵌套结构中的 double ? 我想做这样的事情: sparkSession
我在 JAVA 中的 SparkSQL 中解析一个 json 文件,我需要能够访问返回的坐标,这些坐标似乎是 WrappedArrays 的 WrappedArray。这是代码: df.registe
我正在使用 Spark 2.0。我的数据帧有一列,其中包含浮点型 WrappedArrays 的 WrappedArray。 行的示例如下: [[1.0 2.0 2.0][6.0 5.0 2.0][4
我的问题是我必须在一个列表中找到这些不是空的。当我使用过滤器函数不为空时,我也得到每一行。 我的程序代码如下所示: ... val csc = new CassandraSQLContext(s
我有一个 spark 数据框,这里是架构: |-- eid: long (nullable = true) |-- age: long (nullable = true) |-- sex: long
我正在尝试将包含 Array[String] 的列转换为 String,但我始终收到此错误 org.apache.spark.SparkException: Job aborted due to st
我有一段代码如下: List indexes = joinedCols .map(x-> ((Tuple2)x)._1) .collect();
我有一个数据框,如下所示 +---------------------------------------------------------------------+ |value
我对这两个系列有点困惑。 我知道 Scala 的 Array 调用 Java API。在这种情况下,Wrapped Array 的作用是什么(及其性能特征)? http://www.scala-lan
我是Spark和Scala的新手。我正在尝试将函数称为Spark UDF,但遇到了我似乎无法解决的错误。 我了解在Scala中,Array和Seq并不相同。 WrappedArray是Seq的子类型,
两个问题,一般性问题的答案将指导我制作一个 MVCE 的最小限度。 1) 我如何知道预先注册 WrappedArray(以及我可能使用的 Scala 中的所有其他类)?必须使用 Kryo 从库中注册类
scala> def joinWith(separator: String, values: String*): String = { | Array(values).mkStr
我是 Spark/Scala 的初学者。我想在从 Dataset 中选择的 Array 中提取一个值(Double)。简化的主要步骤如下所示。如何提取每个值[Double]最后一个 val wpA ?
我一直在尝试将 RDD 转换为数据帧。为此,需要定义类型而不是 Any。我正在使用 Spark MLLib PrefixSpan,这就是 freqSequence.sequence 的来源。我从一个数
我正在尝试转换 Dataframe 中的 json 文件,但我陷入了基于数组的字段中。我正在使用 Spark 1.6 和 Java。当我读取嵌套的 Json 并转换为 Dataframe 时,我可以读
我有以下架构: geometry: struct (nullable = true) -- coordinates: array (nullable = true) -- el
我有一个有趣的 json 数据,如下所示: [ { "internal_network" : [ { "address" : [ { "address_id" : 2,
我为一个奇怪的运行时错误抓狂: // File: build.sbt scalaVersion := "2.10.1" // File: src/main/scala/bug/Bug.scala pa
Spark 抛ClassCastExpection对 WrappedArray 执行任何操作时 例子: 我有一个像下面这样的 map 输出 输出: Map(1 -> WrappedArray(Pan4
我是一名优秀的程序员,十分优秀!