gpt4 book ai didi

java - Flink Scala - 比较方法违反了它的通用契约

转载 作者:行者123 更新时间:2023-11-30 10:10:32 24 4
gpt4 key购买 nike

我正在用 Flink 编写一个项目,该项目涉及在批处理数据上流式传输一组查询点,并执行完整的顺序扫描以找到最近的邻居。应该是对单个 Float 值的简单排序操作会引发违反一般约定的错误。主要方法定义为:

object StreamingDeCP{
var points: Vector[Point] = _

def main(args: Array[String]): Unit = {
val queryPointsVec: Vector[Point] = ... // Read from file
val pointsVec: Vector[Point] = ... // Read from file

val streamEnv: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val queryPoints = streamEnv.fromCollection(queryPointsVec)

points = pointsVec
queryPoints.map(new StreamingSequentialScan)

streamEnv.execute("StreamingDeCP")
}

final class StreamingSequentialScan
extends MapFunction[Point, (Point, Vector[Point])] {

def map(queryPoint: Point): (Point, Vector[Point]) = {
val nn = points
.map{ _.eucDist(queryPoint) }
.sorted

(queryPoint, nn)
}
}
}

Point 类和伴随对象是:

case class Point(pointID: Long,
descriptor: Vector[Float]) extends Serializable {
var distance: Float = Float.MaxValue

def eucDist(that: Point): Point = {
// Simple arithmetic to calculate and set the distance variable
}
}

object Point{
implicit def orderByDistance[A <: Point]: Ordering[A] =
Ordering.by(_.distance)
}

为了查明原因,这里有一些关于我尝试过的事情的注释:

  • 断言所有distance 值都在 Float.MaxValue 和 Float.MinValue 之间,并且不存在负零值
  • 断言在同一排序操作中没有重复的 distance 变量(我的用例允许这样做,但我想我会检查它以防万一)
  • 将 float 转换为整数值并改为对这些值进行排序
  • Point 上添加了显式排序,而不是使用隐式
  • 根据唯一的 pointID 而不是 distance 排序,这有效但对于这个问题的上下文没有用。

我还注意到,执行相同的代码并不总能可靠地重现错误。我正在以完全确定的方式读取 Vector[Points],因此导致此行为的唯一可能原因必须是 Flink 调度程序或排序方法中的某些有状态计算。

关于同一主题的其他帖子似乎涉及自定义比较器中遗漏的场景,但这应该是对单个 Float 值的简单排序操作,所以我不知道是什么导致了这个问题。

最佳答案

我不熟悉 Flink,但我没有任何理由假设它会在每个 embarrassingly parallel 执行一次。 MapFunction 以顺序单线程方式执行任务。

由于您的 Point 包含 var,并且那些 varmap 的方法中发生了变化MapFunction,当 MapFunction 以并行方式 执行时,代码必须失败并出现 “比较方法违反其一般契约” - 异常! = 1

为了避免 map 函数中的任何副作用,您可以按如下方式修改代码:

  • main 中删除任何 var,使 points 成为不可变的 val
  • Point 中删除任何类型的 var
  • 实现方法

    def eucDist(other: Point): Double

    它只是计算到另一点的距离(不改变任何东西)。

  • 使用排序方式:

    val nn = points.sortBy(_.eucDist(queryPoint))

或者,如果您想避免在排序过程中多次重新计算欧氏距离,请预先计算一次距离,排序,然后丢弃这些距离:

val nn = points.map(p => (p, p.eucDist(queryPoint))).sortBy(_._2).map(_._1)

关于java - Flink Scala - 比较方法违反了它的通用契约,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52788093/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com