- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我使用元组编码器和用于 LineString 的 kryo 编码器将一些数据存储为 DataSet[(Long, LineString)]
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](implicit
e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit val lineStringEncoder = Encoders.kryo[LineString]
val ds = segmentPoints.map(
sp => {
val p1 = new Coordinate(sp.lon_ini, sp.lat_ini)
val p2 = new Coordinate(sp.lon_fin, sp.lat_fin)
val coords = Array(p1, p2)
(sp.id, gf.createLineString(coords))
})
.toDF("id", "segment")
.as[(Long, LineString)]
.cache
ds.show
+----+--------------------+
| id | segment |
+----+--------------------+
| 347|[01 00 63 6F 6D 2...|
| 347|[01 00 63 6F 6D 2...|
| 347|[01 00 63 6F 6D 2...|
| 808|[01 00 63 6F 6D 2...|
| 808|[01 00 63 6F 6D 2...|
| 808|[01 00 63 6F 6D 2...|
+----+--------------------+
我可以对段列应用任何映射操作并使用基础 LineStrign 方法。
ds.map(_._2.getClass.getName).show(false)
+--------------------------------------+
|value |
+--------------------------------------+
|com.vividsolutions.jts.geom.LineString|
|com.vividsolutions.jts.geom.LineString|
|com.vividsolutions.jts.geom.LineString|
我想创建一些 UDAF 来处理具有相同 id 的段,我尝试了以下两种不同的方法,但没有成功:
1) 使用聚合器:
val length = new Aggregator[LineString, Double, Double] with Serializable {
def zero: Double = 0 // The initial value.
def reduce(b: Double, a: LineString) = b + a.getLength // Add an element to the running total
def merge(b1: Double, b2: Double) = b1 + b2 // Merge intermediate values.
def finish(b: Double) = b
// Following lines are missing on the API doc example but necessary to get
// the code compile
override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}.toColumn
ds.groupBy("id")
.agg(length(col("segment")).as("kms"))
.show(false)
这里我收到以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [id#603L], [id#603L, anon$1(com.test.App$$anon$1@5bf1e07, None, input[0, double, true] AS value#715, cast(value#715 as double), input[0, double, true] AS value#714, DoubleType, DoubleType)['segment] AS kms#721];
2) 使用 UserDefinedAggregateFunction
class Length extends UserDefinedAggregateFunction {
val e = Encoders.kryo[LineString]
// This is the input fields for your aggregate function.
override def inputSchema: StructType = StructType(
StructField("segment", DataTypes.BinaryType) :: Nil
)
// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
StructField("length", DoubleType) :: Nil
)
// This is the output type of your aggregatation function.
override def dataType: DataType = DoubleType
override def deterministic: Boolean = true
// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.0
}
// This is how to update your buffer schema given an input.
override def update(buffer : MutableAggregationBuffer, input : Row) : Unit = {
// val l0 = input.getAs[LineString](0) // Can't cast to LineString (I guess because it is searialized using given encoder)
val b = input.getAs[Array[Byte]](0) // This works fine
val lse = e.asInstanceOf[ExpressionEncoder[LineString]]
val ls = lse.fromRow(???) // it expects InternalRow but input is a Row instance
// I also tried casting b.asInstance[InternalRow] without success.
buffer(0) = buffer.getAs[Double](0) + ls.getLength
}
// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0)
}
// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
buffer.getDouble(0)
}
}
val length = new Length
rseg
.groupBy("id")
.agg(length(col("segment")).as("kms"))
.show(false)
我做错了什么?我想使用自定义类型的聚合 API,而不是使用 rdd groupBy API。我搜索了 Spark 文档,但找不到这个问题的答案,看来目前还处于早期阶段。
谢谢。
最佳答案
根据此 answer ,没有简单的方法可以传递嵌套类型的自定义编码器,即您的情况下的 (Long,LineString) 。
一种选择是定义一个case class LineStringWithID
,它将使用id: Long
属性扩展LineString
,并使用来自SQLImplicits的编码器
附注您能否将您的问题分解为更小的部分,每个部分一个主题?
关于scala - Spark 2.0.0 : How to aggregate DataSet with custom encoded types?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40909867/
我在大学学习C++时学习了这段代码..后来我在C#中使用了同样的东西...但现在我想在Java中使用它...我在互联网上寻找类似的东西,但我什至不知道如何表达它,以便我得到正确的结果。 所以嗯,请让我
我正在我的 Ruby on Rails Controller 上运行 RSPEC 测试,这是我正在测试的 Controller 操作: Controller 代码: class Customers::
想为我选择的选项卡设置自定义背景,到目前为止,子类化是我自定义 UITAbBar/UITabBarItem 的方式。 问题是:有谁知道(或知道我在哪里可以找到)设置背景的属性是什么? 所选选项卡周围有
您好,我在 commerefacades-beans.xml 中创建了 eProductForm bean,我添加了 ProductData 的自定义属性。 然后在commercewebs
我有两个表:1. 客户2. customer_order 客户表包含客户数据(duh),customer_order 包含所有订单。我可以在 customer.id=customer_order.id
在我的 TableView 中,我有一个 NSMutableArray *currList 的数据源 - 它包含对象 Agent 的对象。我创建了自定义的 TableCell 并正确设置了所有内容。我
是否建议使用自引用泛型继承? public abstract class Entity { public Guid Id {get; set;} public int Version
我正在尝试为我的 Grafana 安装使用自定义文件 ( custom.ini )。不幸的是,这不起作用。 我做了什么: 安装了一台装有 CentOS 7 的虚拟机 添加了 Grafana Yum R
我被分配了两个给定类的作业,一个是抽象父类 Lot.java,另一个是测试类 TestLots.java。我不应该编辑其中任何一个。任务是创建Lot的两个子类,使TestLots中的错误不再是错误。
我是 Botpress 的新手。 我刚刚安装了 Botpress 的最新版本“botpress-ce-v11_0_1-win-x64”。 我浏览了文档,发现了一些关于内容类型、内容元素和内容渲染的解释
我一直在四处寻找,但我还没有找到任何东西,除了 Qt3 的旧文档和 qt 设计器的 3.x 版。 我会举个例子,并不是因为我的项目是 GPL 而不能提供代码,而是为了简单起见。 示例:您正在为您的应用
场景 我有一个自定义规则来验证订单的运费: public class OrderValidator : BaseValidator { private string CustomInfo {
我有用于身份验证的自定义拦截器: @Named("authInterceptor") @Provides @Singleton fun providesAuthIntercep
如果有人没有添加照片,我想显示默认头像图像。我假设我需要在模型或助手中执行自定义 getter。 如果我做 getter,它会看起来像这样吗: def avatar_url "default_ur
我正在使用 Google Search API,但遇到了一些麻烦。这个请求(在 Python 中,使用 requests 库)工作正常 res = requests.get("https://www.
我使用 MSKLC 制作了自定义键盘布局。 我以为我仔细按照说明操作了chose appropriate values对于LOCALENAME和 LOCALID参数。 但是,在通过按 Win+Spac
我正在使用 simpleframework解析 XML 字符串并将其转换为对象。 Serializer serializer = new Persister(); try { Customer
我正在使用 C# 控制台应用程序从 MySql 数据库获取一些数据,但在正确查询时遇到一些问题 现在的情况: SELECT * FROM Customer WHERE EXISTS ( SELECT
我在我的 iPhone 4S 上运行我的应用程序,我正在使用自定义表格 View Controller 和自定义表格 View 单元格,当我将表格 View 向上滑动到空白区域并同样向下滑动到空白区域
我有一个自定义的 JavaScript 变量,它正在检查 eventAction 是什么,这样我就可以知道是否触发一些转换像素。自定义 Javascript 称为“FacebookConversion
我是一名优秀的程序员,十分优秀!