gpt4 book ai didi

scala - 理解 Flink - 任务不可序列化

转载 作者:行者123 更新时间:2023-12-02 00:58:33 25 4
gpt4 key购买 nike

我正在做一个 Flink 项目,遇到了一个问题,我在 Stackoverflow 的回答的帮助下设法解决了这个问题。但是,我不清楚所提出的解决方案为什么确实有效,而且我发现有关该主题的信息很少。考虑以下代码:

object DeCP {
def main(args: Array[String]): Unit = {
val params: ParameterTool = ParameterTool.fromArgs(args)

// Get the execution environment and read the data
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val queryPoints: DataSet[Point] = readQueryPoints(env, params)
val points: DataSet[Point] = readFeatureVector(env, params)

// Process the query points
queryPoints
.map(new KNNRich)
.withBroadcastSet(points, "pointsIn")
.print
}

final class KNNRich extends RichMapFunction[Point, (Point, Vector[Point])]{
private var pointsIn: Traversable[Point] = _

override def open(parameters: Configuration): Unit =
pointsIn = getRuntimeContext.getBroadcastVariable[Point]("pointsIn").asScala

def map(queryPoint: Point): (Point, Vector[Point]) = {
val dataSetIn = ExecutionEnvironment.getExecutionEnvironment
.fromCollection(pointsIn.toVector)
val cluster = new Cluster(dataSetIn, queryPoint)
val knn = cluster.kNearestNeighbor(queryPoint, 3) // This call causes problems
(queryPoint, knn.collect.toVector)
}
}
}

Cluster 类和伴随对象定义为:

class Cluster(var points: DataSet[Point],
var clusterLeader: Point) extends Serializable {
private var queryPoint: Point = _

def distance(p: Point): Point = {
p.eucDist(queryPoint)
}

def kNearestNeighbor(queryPoint: Point, k: Int): DataSet[Point] = {
this.queryPoint = queryPoint

this.points.map{p => distance(p)} // Task not serializable
this.points.map{p => p.eucDist(queryPoint)} // Works
this.points.map{p => Cluster.staticDistance(queryPoint, p)} // Works
}
}

object Cluster {
def staticDistance(queryPoint: Point, p: Point): Point = {
p.eucDist(queryPoint)
}
}

调用 distance 方法会导致任务不可序列化异常,但用定义替换方法调用可以解决问题。同样,将完全相同的方法定义为伴随对象的成员可以使代码正常运行。

为什么第一个调用不起作用,但另外两个调用起作用?如果您在类上有更复杂的执行流程,并且不容易替换为伴随对象上的方法,会发生什么情况?

最佳答案

通过执行数据集转换,您只是在创建管道的逻辑计划。管道通过调用execute/print/collect提交到集群。

当管道被提交到集群时,每个函数(例如您的 RichMapFunction)被序列化,发送到集群,为每个并行实例复制,并独立执行。当您收到“任务不可序列化”异常时,这意味着您的 RichMapFunction 正在传递地引用此类外部的变量/对象。您应该确保一个函数是一个独立的 block 。

通过调用 points.map{},您隐式创建了一个 MapFunction。但是这个 MapFunction 引用了 Cluster 的实例,因此不是独立的。 Flink 也尝试序列化 Cluster 但失败了。如果 distance 是静态的(在伴随对象中定义),则 Cluster 也不需要序列化。

顺便说一下,您的示例的另一个问题是您没有按预期使用 DataSet API。通常,您不应在正在运行的管道内创建管道。这也可能导致意想不到的副作用。

关于scala - 理解 Flink - 任务不可序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52386750/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com