gpt4 book ai didi

java - Apache Spark : How to structure code of a Spark Application (especially when using Broadcasts)

转载 作者:搜寻专家 更新时间:2023-11-01 03:34:23 26 4
gpt4 key购买 nike

我有一个关于 Java Spark 应用程序中代码结构的一般性问题。我想将 Spark 转换的实现代码与 RDD 的调用分开,这样即使使用包含大量代码行的大量转换,应用程序的源代码也能保持清晰。

我先给你一个简短的例子。在这种情况下,flatMap 转换的实现是作为匿名内部类提供的。这是一个简单的应用程序,它读取一个整数 RDD,然后将每个元素乘以一个整数数组,该数组之前广播到所有工作节点:

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));

final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });

result = result.flatMap(new FlatMapFunction<Integer, Integer>() {
public Iterable<Integer> call(Integer t) throws Exception {
int[] values = factors.value();
LinkedList<Integer> result = new LinkedList<Integer>();
for (int value : values) result.add(t * value);
return result;
}
});

System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27]

sc.close();
}

为了构建代码,我将 Spark 函数的实现提取到了另一个类中。 SparkFunctions 类提供了 flatMap 转换的实现,并有一个 setter 方法来获取对广播变量的引用(......在我的真实场景中,这个类中会有很多操作,所有访问广播数据)。

我的经验是,表示 Spark 转换的方法可以是静态的,只要它不访问 Broadcast 变量或 Accumulator 变量即可。为什么?静态方法只能访问静态属性。对 Broadcast 变量的静态引用始终为 null(可能是因为当 Spark 将类 SparkFunctions 发送到工作节点时它未被序列化)。

@SuppressWarnings("serial")
public class SparkFunctions implements Serializable {

private Broadcast<int[]> factors;

public SparkFunctions() {
}

public void setFactors(Broadcast<int[]> factors) {
this.factors = factors;
}

public final FlatMapFunction<Integer, Integer> myFunction = new FlatMapFunction<Integer, Integer>() {
public Iterable<Integer> call(Integer t) throws Exception {
int[] values = factors.value();
LinkedList<Integer> result = new LinkedList<Integer>();
for (int value : values) result.add(t * value);
return result;
}
};

}

这是使用 SparkFunctions 类的应用程序的第二个版本:

public static void main(String[] args) {

SparkConf conf = new SparkConf().setMaster("local").setAppName("MyApp");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD<Integer> result = sc.parallelize(Arrays.asList(5, 8, 9));

final Broadcast<int[]> factors = sc.broadcast(new int[] { 1, 2, 3 });

// 1) Initializing
SparkFunctions functions = new SparkFunctions();

// 2) Pass reference of broadcast variable
functions.setFactors(factors);

// 3) Implementation is now in the class SparkFunctions
result = result.flatMap(functions.myFunction);

System.out.println(result.collect()); // [5, 10, 15, 8, 16, 24, 9, 18, 27]

sc.close();
}

应用程序的两个版本都在工作(本地和集群设置),但我想问的是它们是否同样高效。

问题1:在我看来,Spark序列化类SparkFunctions,包括Broadcast变量,并将其发送到工作节点,以便节点可以在自己的函数中使用该函数任务。数据是否被两次发送到工作节点,第一次是使用SparkContext进行广播,然后是类SparkFunctions的序列化?还是每个元素发送一次(广播加 1 次)?

问题 2:您能否就源代码的其他结构提供建议?

请不要提供如何防止广播的解决方案。我有一个复杂得多的实际应用程序。

我发现的类似问题并没有真正帮助:

预先感谢您的帮助!

最佳答案

这是关于问题 1

当一个spark job被提交时,jobs被划分为stages->tasks。这些任务实际上是在工作节点上执行转换和操作。驱动程序的 sumbitTask() 将有关广播变量的函数和元数据序列化到所有节点。

广播工作原理剖析。

Driver 创建一个本地目录来存储要广播的数据,并启动一个可以访问该目录的 HttpServer。数据实际上是在调用广播时写入目录的(val bdata = sc.broadcast(data))。同时数据也以StorageLevel内存+磁盘的方式写入驱动的blockManger。 block 管理器为数据分配一个 blockId(BroadcastBlockId 类型)。

真正的数据只有在执行器反序列化它接收到的任务时才会广播,它还会以 Broadcast 对象的形式获取广播变量的元数据。然后调用元数据对象(bdata 变量)的 readObject() 方法。此方法将首先检查本地 block 管理器以查看是否已经存在本地副本。如果没有,数据将从驱动程序中获取。获取数据后,它会存储在本地 block 管理器中以供后续使用。

关于java - Apache Spark : How to structure code of a Spark Application (especially when using Broadcasts),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35754123/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com