- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我有一个关于 Java Spark 应用程序中代码结构的一般性问题。我想将 Spark 转换的实现代码与 RDD 的调用分开,这样即使使用包含大量代码行的大量转换,应用程序的源代码也能保持清晰。
我先给你一个简短的例子。在这种情况下,flatMap 转换的实现是作为匿名内部类提供的。这是一个简单的应用程序,它读取一个整数 RDD,然后将每个元素乘以一个整数数组,该数组之前广播到所有工作节点:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));
final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });
result = result.flatMap(new FlatMapFunction<Integer, Integer>() {
public Iterable<Integer> call(Integer t) throws Exception {
int[] values = factors.value();
LinkedList<Integer> result = new LinkedList<Integer>();
for (int value : values) result.add(t * value);
return result;
}
});
System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27]
sc.close();
}
为了构建代码,我将 Spark 函数的实现提取到了另一个类中。 SparkFunctions
类提供了 flatMap 转换的实现,并有一个 setter 方法来获取对广播变量的引用(......在我的真实场景中,这个类中会有很多操作,所有访问广播数据)。
我的经验是,表示 Spark 转换的方法可以是静态的,只要它不访问 Broadcast 变量或 Accumulator 变量即可。为什么?静态方法只能访问静态属性。对 Broadcast 变量的静态引用始终为 null
(可能是因为当 Spark 将类 SparkFunctions
发送到工作节点时它未被序列化)。
@SuppressWarnings("serial")
public class SparkFunctions implements Serializable {
private Broadcast<int[]> factors;
public SparkFunctions() {
}
public void setFactors(Broadcast<int[]> factors) {
this.factors = factors;
}
public final FlatMapFunction<Integer, Integer> myFunction = new FlatMapFunction<Integer, Integer>() {
public Iterable<Integer> call(Integer t) throws Exception {
int[] values = factors.value();
LinkedList<Integer> result = new LinkedList<Integer>();
for (int value : values) result.add(t * value);
return result;
}
};
}
这是使用 SparkFunctions
类的应用程序的第二个版本:
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));
final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });
// 1) Initializing
SparkFunctions functions = new SparkFunctions();
// 2) Pass reference of broadcast variable
functions.setFactors(factors);
// 3) Implementation is now in the class SparkFunctions
result = result.flatMap(functions.myFunction);
System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27]
sc.close();
}
应用程序的两个版本都在工作(本地和集群设置),但我想问的是它们是否同样高效。
问题1:在我看来,Spark序列化类SparkFunctions
,包括Broadcast变量,并将其发送到工作节点,以便节点可以在自己的函数中使用该函数任务。数据是否被两次发送到工作节点,第一次是使用SparkContext
进行广播,然后是类SparkFunctions
的序列化?还是每个元素发送一次(广播加 1 次)?
问题 2:您能否就源代码的其他结构提供建议?
请不要提供如何防止广播的解决方案。我有一个复杂得多的实际应用程序。
我发现的类似问题并没有真正帮助:
预先感谢您的帮助!
最佳答案
这是关于问题 1
当一个spark job被提交时,jobs被划分为stages->tasks。这些任务实际上是在工作节点上执行转换和操作。驱动程序的 sumbitTask() 将有关广播变量的函数和元数据序列化到所有节点。
广播工作原理剖析。
Driver 创建一个本地目录来存储要广播的数据,并启动一个可以访问该目录的 HttpServer。数据实际上是在调用广播时写入目录的(val bdata = sc.broadcast(data))。同时数据也以StorageLevel内存+磁盘的方式写入驱动的blockManger。 block 管理器为数据分配一个 blockId(BroadcastBlockId 类型)。
真正的数据只有在执行器反序列化它接收到的任务时才会广播,它还会以 Broadcast 对象的形式获取广播变量的元数据。然后调用元数据对象(bdata 变量)的 readObject() 方法。此方法将首先检查本地 block 管理器以查看是否已经存在本地副本。如果没有,数据将从驱动程序中获取。获取数据后,它会存储在本地 block 管理器中以供后续使用。
关于java - Apache Spark : How to structure code of a Spark Application (especially when using Broadcasts),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35754123/
我是一名优秀的程序员,十分优秀!