gpt4 book ai didi

java - 在 Apache Spark 中,我可以轻松地重复/嵌套 SparkContext.parallelize 吗?

转载 作者:行者123 更新时间:2023-12-01 10:44:13 24 4
gpt4 key购买 nike

我正在尝试对我们正在尝试解决的遗传学问题进行建模,并逐步构建它。我可以成功运行 Spark Examples 中的 PiAverage 示例。该示例向一个圆圈“扔飞镖”(在我们的例子中为 10^6),并计算“落在圆圈中”的数量来估计 PI

假设我想重复该过程 1000 次(并行)并对所有这些估计值取平均值。我正在尝试寻找最好的方法,似乎会有两次并行调用?嵌套调用?没有办法将映射或减少调用链接在一起吗?我看不到。

我想知道下面这个想法是否明智。我考虑使用累加器来跟踪结果估计。 jsc 是我的 SparkContext,单次运行的完整代码在问题末尾,感谢您的任何输入!

Accumulator<Double> accum = jsc.accumulator(0.0);

// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);

// pass this "dummy list" to parallelize, which then
// calls a pieceOfPI method to produce each individual estimate
// accumulating the estimates. PieceOfPI would contain a
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));

// get the value of the total of PI estimates and print their average
double totalPi = accum.value();

// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);

这看起来不像是我在 SO 上看到的矩阵或其他答案,请给出这个特定问题的答案,我已经进行了多次搜索,但我不知道如何在不“并行化并行化”的情况下做到这一点。这是一个坏主意吗?

(是的,我意识到在数学上我可以做更多的估计并有效地得到相同的结果:)尝试构建我老板想要的结构,再次感谢!

如果有帮助的话,我已经把我的整个单一测试程序放在这里,没有我正在测试的累加器。其核心将成为 PieceOfPI():

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;

public class PiAverage implements Serializable {

public static void main(String[] args) {

PiAverage pa = new PiAverage();
pa.go();

}

public void go() {

// should make a parameter like all these finals should be
// int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
final int SLICES = 16;

// how many "darts" are thrown at the circle to get one single Pi estimate
final int HOW_MANY_DARTS = 1000000;

// how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
final int HOW_MANY_ESTIMATES = 1000;

SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
.setMaster("local[4]");

JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
for (int i = 0; i < HOW_MANY_DARTS; i++) {
throwsList.add(i);
}

// setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
numberOfEstimates.add(i);
}

JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);

long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer i) {
double x = Math.random();
double y = Math.random();
if (x * x + y * y < 1) {
return true;
} else
return false;
}
}).count();

System.out.println(
"The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);

jsc.stop();
jsc.close();
}
}

最佳答案

让我从你的“背景问题”开始。 mapjoingroupBy 等转换操作分为两类;那些需要对来自所有分区的数据进行洗牌的输入,以及那些不需要的输入。 groupByjoin 等操作需要随机播放,因为您需要使用相同的键将所有 RDD 分区中的所有记录汇集在一起​​(想想 SQL JOINGROUP BY 操作有效)。另一方面,mapflatMapfilter等不需要混洗,因为该操作在上一步的分区。它们一次处理单个记录,而不是具有匹配键的一组记录。因此,不需要进行洗牌。

这个背景对于理解“额外的 map ”不会有很大的开销是必要的。诸如 mapflatMap 等一系列操作被“压缩”到一个“阶段”(当您在 Spark 中查看作业的详细信息时会显示该阶段) Web 控制台),以便只有一个 RDD 被具体化,即阶段末尾的那个。

关于你的第一个问题。我不会为此使用累加器。它们用于“边带”数据,例如计算您解析了多少行坏行。在此示例中,您可以使用累加器来计算半径 1 内部和外部的 (x,y) 对数量。

Spark 发行版中的 JavaPiSpark 示例已经足够好了。你应该研究它为什么有效。这是适合大数据系统的数据流模型。您可以使用“聚合器”。在 Javadocs ,单击“index”并查看 aggaggregateaggregateByKey 函数。然而,它们在这里并不容易理解,也没有必要。它们比 mapreduce 提供了更大的灵 active ,因此值得了解它们

代码的问题在于,您实际上是在试图告诉 Spark 要做什么,而不是表达您的意图并让 Spark 优化它为您执行的操作方式。

最后,我建议你购买并学习O'Reilly的《Learning Spark》。它很好地解释了内部细节,例如暂存,并且还显示了许多您可以使用的示例代码。

关于java - 在 Apache Spark 中,我可以轻松地重复/嵌套 SparkContext.parallelize 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34279781/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com