- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正从基于 MPI 的系统转向 Apache Spark。我需要在 Spark 中执行以下操作。
假设,我有 n
个顶点。我想从这些 n
顶点创建一个边列表。边只是两个整数 (u,v) 的元组,不需要任何属性。
但是,我想在每个执行器中独立地并行创建它们。因此,我想为 P
Spark Executors 独立创建 P
边数组。每个数组的大小可能不同并且取决于顶点,因此,我还需要从 0
到 n-1
的执行程序 ID。接下来,我想要一个全局 RDD 边数组。
在 MPI 中,我会使用处理器等级在每个处理器中创建一个数组。我如何在 Spark 中做到这一点,尤其是使用 GraphX
库?
因此,我的主要目标是在每个执行器中创建一个边数组,并将它们组合成一个 RDD。
我首先尝试的是鄂尔多斯的一个修改版本——仁义模型。作为参数,我只有节点数 n 和概率 p。
假设,执行器i
必须处理从101
到200
的节点。对于任何节点,比如节点 101
,它将创建从 101
到 102 -- n
的边,概率为 p。在每个执行程序创建分配的边之后,我将实例化 GraphX EdgeRDD
和 VertexRDD
。因此,我的计划是在每个执行器中独立创建边缘列表,并将它们合并到 RDD
中。
最佳答案
让我们从下游处理所需的一些导入和变量开始:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import scala.util.Random
import org.apache.spark.HashPartitioner
val nPartitions: Integer = ???
val n: Long = ???
val p: Double = ???
接下来我们需要一个种子 ID 的 RDD,它可以用来生成边。处理这个问题的一种天真的方法就是这样:
sc.parallelize(0L to n)
由于生成的边数取决于节点 ID,因此这种方法会产生高度倾斜的负载。我们可以通过重新分区做得更好:
sc.parallelize(0L to n)
.map((_, None))
.partitionBy(new HashPartitioner(nPartitions))
.keys
但更好的方法是从空 RDD 开始并就地生成 ID。我们需要一个小 helper :
def genNodeIds(nPartitions: Int, n: Long)(i: Int) = {
(0L until n).filter(_ % nPartitions == i).toIterator
}
可以按如下方式使用:
val empty = sc.parallelize(Seq.empty[Int], nPartitions)
val ids = empty.mapPartitionsWithIndex((i, _) => genNodeIds(nPartitions, n)(i))
只是一个快速的健全性检查(它非常昂贵,所以不要在生产中使用它):
require(ids.distinct.count == n)
我们可以使用另一个助手生成实际的边缘:
def genEdgesForId(p: Double, n: Long, random: Random)(i: Long) = {
(i + 1 until n).filter(_ => random.nextDouble < p).map(j => Edge(i, j, ()))
}
def genEdgesForPartition(iter: Iterator[Long]) = {
// It could be an overkill but better safe than sorry
// Depending on your requirement it could worth to
// consider using commons-math
// https://commons.apache.org/proper/commons-math/userguide/random.html
val random = new Random(new java.security.SecureRandom())
iter.flatMap(genEdgesForId(p, n, random))
}
val edges = ids.mapPartitions(genEdgesForPartition)
最后我们可以创建一个图表:
val graph = Graph.fromEdges(edges, ())
关于scala - 在 Spark 中为每个 Executor 创建数组并组合成 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34296588/
Issue 2019/05/09 21:50:07.380 +0800 ERROR [ExecutorManager] [Azkaban] No active executors found
我的问题是:使用 Executors.newFixedThreadPool(1)?? 有意义吗? 。在两个线程(main + oneAnotherThread)场景下使用执行器服务是否高效?正在通过调
我想知道,Executors.newSingleThreadExecutor() 之间有什么区别?和 Executors.newFixedThreadPool(1) 以下摘自javadoc Unlik
我的问题是:使用 Executors.newFixedThreadPool(1) 有意义吗??。在两个线程(main + oneAnotherThread)场景中使用执行器服务是否有效?通过调用 ne
我有一个 Apache Spark 应用程序在集群模式下运行在 YARN 集群上(spark 在这个集群上有 3 个节点)。 当应用程序运行时,Spark-UI 显示 2 个执行程序(每个运行在不同的
我想知道是否有任何理由使用 Executor 而不是 ExecutorService。 据我所知,JDK 中没有实现 Executor 接口(interface),它也不是 ExecutorServi
我有多个使用 Celery Executor 的 dag,但我希望使用 Kubernetes Executor 运行一个特定的 dag。我无法推断出一种良好而可靠的方法来实现这一目标。 我有一个 ai
假设我们的 Controller 中有一个 Action 。在每次请求时,许多用户都会调用 performLogin。 def performLogin( ) = { Async {
创建和管理您自己的 ExecutorService 与使用 Spring Boot 的 @Async 方法和 @Bean 方法创建 Executor 添加一个@Bean来创建一个Executor 手动
问题从无到有,只有我在代码中所做的更改 - 安装了 RaSharper(但删除它并重新安装 Visual Studio 没有帮助)。 所以我使用 NUnit 3 来运行测试。 我有 Visual St
我们知道每个任务当时都在一个核心中执行。 假设我们有这样配置的节点集群: 10 节点。 每个节点 16 个核心。 每个节点 64 GB 内存。 我的问题是 有 1 个 16 核的执行程序和 16 个
我正在从 Jupyter Notebook 中初始化 PySpark,如下所示: from pyspark import SparkContext # conf = SparkConf().setAp
我正在向我的 Web 应用程序添加一个基于 Flask 的 API,以控制某些网络自动化功能的启动和停止。我遇到了一个奇怪的行为,即 Flask-Executor .submit() 方法调用的函数似
单元测试在本地运行良好。 在 Visual Studio 2017 托管生成代理上运行时,VSTest 任务失败并显示: 2018-12-08T10:42:16.3779907Z An excepti
我正在尝试制作一个执行器和线程的简单示例。 当我调用 newSingleThreadExecutor(new CustomThreadFactory) 时,一切顺利,但是当我使用 null 参数调用
对于一个线程,我通过以下代码段捕获未捕获的异常。但是,对于 ExecutorService executor = Executors.newFixedThreadPool(10);,如何捕获未捕获的异
我想创建一个 CompletableFuture,其返回值在 Kotlin 中的特定执行程序上运行。 下面的代码工作得很好。 return CompletableFuture.supplyAsync
考虑基本的固定线程池: Executors.newFixedThreadPool(MaxListeners) 我打算不断提交新任务 - 响应传入的 TCP 套接字服务请求。 然而,当每个任务中的Run
我们可以在定义 ThreadPoolExecutors 时提供 BlockingQueue 实现。但是,如果我使用工厂(执行器)创建单个线程池(如下所示),我想知道使用哪个阻塞队列。我猜它是一个 Li
我编写了一个程序来执行两个在 shell 前台运行的命令,直到在终端上按下 ^c。 外壳命令 ./weed master -mdir=/var/lib/qualebs/weed 上面命令的输出是 qu
我是一名优秀的程序员,十分优秀!