gpt4 book ai didi

apache-spark - Spark 节点在 Shuffle 期间如何通信?

转载 作者:行者123 更新时间:2023-12-03 07:07:45 25 4
gpt4 key购买 nike

我从this question看到Spark 节点有效地“直接通信”,但我不太关心理论,而更关心实现。 Here它在页面底部附近的“###Encryption”部分中显示,您可以将 Spark 配置为使用许多 SSL 协议(protocol)来确保安全,这表明,至少对我来说,它使用某种形式的 HTTP (s) 用于通讯。我的问题实际上分为两部分:Spark 节点使用什么协议(protocol)进行通信,以及此传输的数据格式如何?

最佳答案

Spark 使用 RPC (Netty) 在执行器进程之间进行通信。您可以查看NettyRpcEndpointRef类来查看实际的实现。

对于数据打乱,我们从 BlockManager 开始它负责提供数据 block 。每个执行者进程都有一个。内部有一个 BlockStoreShuffleReader,它使用 SerializerManager 管理来自不同执行器的读取。该管理器拥有一个实际的序列化器,它由 spark.serializer 属性定义:

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

BlockManager尝试读取 block 时,它会使用该底层配置中的序列化器。它可以是 KryoSerializerJavaSerializer,具体取决于您的设置。

底线,为了读取和写入打乱的数据,Spark 使用用户定义的序列化器。

<小时/>

对于任务序列化,这有点不同。

Spark 使用名为 closureSerializer 的变量,默认为 JavaSerializerInstance ,表示Java序列化。您可以在 DAGScheduler.submitMissingTasks 中看到这一点方法:

val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

被序列化并发送到每个执行器的实际对象称为 TaskDescription :

def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)

dataOut.writeLong(taskDescription.taskId)
dataOut.writeInt(taskDescription.attemptNumber)
dataOut.writeUTF(taskDescription.executorId)
dataOut.writeUTF(taskDescription.name)
dataOut.writeInt(taskDescription.index)

// Write files.
serializeStringLongMap(taskDescription.addedFiles, dataOut)

// Write jars.
serializeStringLongMap(taskDescription.addedJars, dataOut)

// Write properties.
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
val bytes = value.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}

// Write the task. The task is already serialized, so write it directly to the byte buffer.
Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)

dataOut.close()
bytesOut.close()
bytesOut.toByteBuffer
}

并通过 RPC 从 CoarseGrainedSchedulerBackend.launchTasks 发送方法:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

到目前为止我所展示的内容都是关于启动任务的。对于混洗数据,Spark 拥有一个 BlockStoreShuffleReader,它管理来自不同执行器的读取。

关于apache-spark - Spark 节点在 Shuffle 期间如何通信?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45715133/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com