- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Master 分配资源并在 Worker上启动 Executor ,逐行代码注释版由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
上一次阅读到了 SparkContext 初始化,继续往下之前,先温故一下之前的内容.
这里有个假设是:Spark 集群以 Standalone 的方式来启动的,作业也是提交到 Spark standalone 集群.
首先需要启动 Spark 集群,使用 start-all.sh 脚本依次启动 Master (主备) 和多个 Worker.
启动好之后,开始提交作业,使用 spark-submit 命令来提交.
接着上次 ClientEndpoint 启动之后,会向 Master 发送一个 RegisterApplication 消息,Master 开始处理这个消息.
然后看到 Matster 类处理 RegisterApplication 消息的地方:
可以看到,用应用程序的描述和 Driver 的引用创建了一个 Application,然后开始注册这个 Application.
注册 Application 很简单,就是往 Master 的内存中加入各种信息,重点来了,把 ApplicationInfo 加入到了 waitingApps 这个结构里,然后 schedule() 方法会遍历这个列表,为 Application 分配资源,并调度起来.
然后往 zk 中写入了 Application 的信息,并且往 Driver 发送了一个 RegisteredApplication 应用已经注册的消息.
接着开始 schedule(),这个方法上次讲过,它会遍历两个列表,一个是遍历 waitingDrivers 来启动 Driver,一个是遍历 waitingApps,来启动 Application.
waitingDrivers 列表在客户端请求启动 Driver 的时候就处理过了,本次重点看这个方法:
有以下几个步骤:
源码从 Master 类的 schedule() 方法的最后一行 startExecutorsOnWorkers() 开始:
这个方法主要作用是计算 worker 的 executor 数量和分配的资源并启动 executor.
(1)遍历 waitingApps,如果 app 还需要的 cpu 核数大于每个执行器的核数,才继续分配.
(2)过滤可用的 worker,条件一:该 worker 剩余内存大于单个 executor 需要的内存;条件二:该 worker 剩余 cpu 核数大于单个 executor 需要的核数;然后按照可用 cpu核数从大到小排序.
(3)下面两个方法是关键的方法 。
scheduleExecutorsOnWorkers(),用来计算每个 Worker 上可用的 cpu 核数,
allocateWorkerResourceToExecutors() 用来真正在 Worker 上分配 Executor.
这个方法很长,首先看方法注释,大致翻译了一下:
当执行器分配的 cpu 核数(spark.executor.cores)被显示设置的时候,如果这个 worker 上有足够的核数和内存的话,那么每个 worker 上可以执行多个执行器;反之,没有设置的时候,每个 worker 上只能启动一个执行器;并且,这个执行器会使用 worker 能提供出来的尽可能多的核数,
appA 和 appB 都有一个执行器运行在 worker1 上。但是 appA 还需要一些 cpu 核,当 appB 执行结束,释放了它在 worker1 上的核数时, 下一次调度的时候,appA 会新启动一个 executor 获得了 worker1 上所有的可用的核心,因此 appA 就在 worker1 上启动了多个执行器.
设置 coresPerExecutor (spark.executor.cores)很重要,考虑下面的例子:集群有4个worker,每个worker有16核;用户请求 3 个执行器(spark.cores.max = 48,spark.executor.cores=16)。如果不设置这个参数,那么每次分配 1 个 cpu核心,每个 worker 轮流分配一个 cpu核,最终 4 个执行器分配 12 个核心给每个 executor,4 个 worker 也同样分配了48个核心,但是最终每个 executor 只有 12核 < 16 核,所以最终没有执行器被启动.
如果看我的翻译还是很费劲,我就再精简下:
下面是源码,每句都有挨个注释过,中间有一个方法是判断这个 Worker 上还能不能再分配 Executor 了.
重点是中间方法后面那一段,遍历每个 Worker 分配 cpu,如果不是 Spend Out 模式,则在一个 Worker 上一直分配,直到 Worker 资源分配完毕.
接着真正开始在 Worker 上启动 Executor:
在 launchExecutor 在方法里:
给 Worker 发送了一个 LaunchExecutor 消息.
然后给执行器对应的 Driver 发送了 ExecutorAdded 消息.
本次我们讲了 Master 处理应用的注册,重点是把 app 信息加入到 waitingApps 列表中,然后调用 schedule() 方法,计算每个 Worker 可用的 cpu核数,并且在 Worker 上启动执行器.
原文链接:https://mp.weixin.qq.com/s/a1fdln_rqFN_qmoC9Gz7Jg 。
最后此篇关于Master 分配资源并在 Worker上启动 Executor ,逐行代码注释版的文章就讲到这里了,如果你想了解更多关于Master 分配资源并在 Worker上启动 Executor ,逐行代码注释版的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我正在使用 python 加密一些文件,但我在逐 block 读取文件时遇到问题。 有时不会返回最后一个 block 的所有数据。 当文件长度为 307200 字节时,我没有问题。当它的长度为 279
我正在使用 WebRTC 将文件发送到连接的对等方,并且我正在以块的形式发送文件。但是,我无法弄清楚如何让对等方在文件逐块流入时保存/下载文件。 我在网上找到的所有例子都推荐做这样的事情: // se
我用 Tiled 做了一张 map 。它的每一 block 图 block 都尺寸为 32x32 像素,我的主要角色 Sprite 也是。 在我的类(class) Player.cpp 中,我有一些计
我见过一些单页网站,您可以逐 block 滚动,因此您没有无限滚动。 你逐 block 移动。 是否有提供此功能的任何脚本或其他东西? 最佳答案 我自己从未使用过它,所以我无法在代码方面为您提供帮助,
这是一个逐 block 反转文件内容的程序。 #include #include #define BS 12 void reverse(char * buffer, int size) { c
在下面的代码中,有没有办法避免 if 语句? s = 13; /*Total size*/ b = 5; /*Block size*/ x = 0; b1 = b; while(x s)
我正在尝试分割输入图像并逐个对其进行模糊处理,但毕竟对相邻图 block 调用 cv::blur 我得到了边界像素,这与我有一次将 cv::blur 集体应用于整个图像。 Mat upper(im,
我想逐个读取文件。该文件被分成几部分,存储在不同类型的媒体上。我目前所做的是调用文件的每个单独部分,然后将其合并回原始文件。 问题是我需要等到所有 block 都到达后才能播放/打开文件。是否可以在
我有一个包含客户和日期列表的 JSON 文件。 文件看起来像这样: { "Customers": [ { "Customer": "Customer Name Here", "Company"
我的邮件目标是从连接到HTTP服务器的TCP套接字读取数据,然后解析 HTTP响应块(传输编码:分块)-服务器在同一连接上每30秒发送一个块 我附上了我的代码。看起来io.Copy读取第一个块,然后等
我认为自己是一位经验丰富的 numpy 用户,但我无法找到以下问题的解决方案。假设有以下数组: # sorted array of times t = numpy.cumsum(numpy.rando
当我将文件添加到暂存区时,我可以 $ git add my_file -p 然后选择我要暂存的 block 。 有没有办法 merge/挑选一个提交并逐 block 应用它的差异? 谢谢 最佳答案 我
我有一个 mongodb 查询,它获取大约 50,000 个大文档。 这对我的 RAM 来说太多了,因此计算机速度变慢了。 现在我想逐 block 迭代 mongodb 结果。 我想获取前 1000
我不会为 AES 或其他加密打开此线程,因为这是我要用来加密 AES 和其他加密的 key 的内容。我从 StackOverflow 和其他一些网站收集了一些代码,并对其进行了编辑以适合我的程序,但是
我在做一些后台工作时尝试收集所有系统统计数据。例如,我使用以下命令来收集 IO 统计信息: iostat -xty 5 此脚本用于每 5 秒收集一次 I/O 统计信息。所以我的日志会包含很多数据 bl
我需要 php 脚本,用于从 url 到服务器的可恢复文件下载。它应该能够开始下载,然后在捕捉时(30 秒 - 5 分钟)恢复,依此类推,直到完成整个文件。 perl 中有类似的东西 http://c
是否有标准的 Linux 命令可用于逐 block 读取文件?例如,我有一个大小为 6kB 的文件。我想读取/打印第一个 1kB,然后是第二个 1kB ...似乎 cat/head/tail 在这种情
我正在处理大量文件,我想逐 block 处理这些文件,假设在每批处理中,我想分别处理每 50 个文件。 如何使用 Spark Structured Streaming 来实现? 我看到 Jacek L
我正在处理大量文件,我想逐 block 处理这些文件,假设在每批处理中,我想分别处理每 50 个文件。 如何使用 Spark Structured Streaming 来实现? 我看到 Jacek L
我想知道:逐 block 读取 jp2 并将数据存储在缓冲区对象中的预期方法是什么? 现在我正在做类似的事情。 /* note I already created stream and configu
我是一名优秀的程序员,十分优秀!