gpt4 book ai didi

java - 如何测量 Spark 需要在分区 RDD 上运行操作的时间?

转载 作者:行者123 更新时间:2023-11-30 11:02:51 25 4
gpt4 key购买 nike

我编写了一个小型 Spark 应用程序,它应该测量 Spark 在分区 RDD 上运行操作所需的时间(combineByKey 函数对值求和)。

我的问题是,第一次迭代似乎工作正常(计算的持续时间约为 25 毫秒),但接下来的迭代花费的时间要少得多(约为 5 毫秒)。在我看来,Spark 在没有任何请求的情况下保留数据!?我可以通过编程方式避免这种情况吗?

我必须知道 Spark 计算新 RDD 所需的持续时间(没有任何缓存/保留早期迭代)--> 我认为持续时间应始终约为 20-25 毫秒!

为了确保重新计算,我将 SparkContext 生成移到了 for 循环中,但这并没有带来任何变化...

感谢您的建议!

我的代码似乎保留了任何数据:

public static void main(String[] args) {

switchOffLogging();

// jetzt

try {
// Setup: Read out parameters & initialize SparkContext
String path = args[0];
SparkConf conf = new SparkConf(true);
JavaSparkContext sc;

// Create output file & writer
System.out.println("\npar.\tCount\tinput.p\tcons.p\tTime");

// The RDDs used for the benchmark
JavaRDD<String> input = null;
JavaPairRDD<Integer, String> pairRDD = null;
JavaPairRDD<Integer, String> partitionedRDD = null;
JavaPairRDD<Integer, Float> consumptionRDD = null;

// Do the tasks iterative (10 times the same benchmark for testing)
for (int i = 0; i < 10; i++) {
boolean partitioning = true;
int partitionsCount = 8;

sc = new JavaSparkContext(conf);
setS3credentials(sc, path);

input = sc.textFile(path);
pairRDD = mapToPair(input);

partitionedRDD = partition(pairRDD, partitioning, partitionsCount);

// Measure the duration
long duration = System.currentTimeMillis();
// Do the relevant function
consumptionRDD = partitionedRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
duration = System.currentTimeMillis() - duration;

// So some action to invoke the calculation
System.out.println(consumptionRDD.collect().size());

// Print the results
System.out.println("\n" + partitioning + "\t" + partitionsCount + "\t" + input.partitions().size() + "\t" + consumptionRDD.partitions().size() + "\t" + duration + " ms");

input = null;
pairRDD = null;
partitionedRDD = null;
consumptionRDD = null;

sc.close();
sc.stop();

}
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}

一些辅助函数(应该不是问题所在):

private static void switchOffLogging() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
}

private static void setS3credentials(JavaSparkContext sc, String path) {
if (path.startsWith("s3n://")) {
Configuration hadoopConf = sc.hadoopConfiguration();
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
hadoopConf.set("fs.s3n.awsAccessKeyId", "mycredentials");
hadoopConf.set("fs.s3n.awsSecretAccessKey", "mycredentials");
}
}

// Initial element
private static Function<String, Float> createCombiner = new Function<String, Float>() {
public Float call(String dataSet) throws Exception {
String[] data = dataSet.split(",");
float value = Float.valueOf(data[2]);
return value;
}
};

// merging function for a new dataset
private static Function2<Float, String, Float> mergeValue = new Function2<Float, String, Float>() {
public Float call(Float sumYet, String dataSet) throws Exception {
String[] data = dataSet.split(",");
float value = Float.valueOf(data[2]);
sumYet += value;
return sumYet;
}
};

// function to sum the consumption
private static Function2<Float, Float, Float> mergeCombiners = new Function2<Float, Float, Float>() {
public Float call(Float a, Float b) throws Exception {
a += b;
return a;
}
};

private static JavaPairRDD<Integer, String> partition(JavaPairRDD<Integer, String> pairRDD, boolean partitioning, int partitionsCount) {
if (partitioning) {
return pairRDD.partitionBy(new HashPartitioner(partitionsCount));
} else {
return pairRDD;
}
}

private static JavaPairRDD<Integer, String> mapToPair(JavaRDD<String> input) {
return input.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String debsDataSet) throws Exception {
String[] data = debsDataSet.split(",");
int houseId = Integer.valueOf(data[6]);
return new Tuple2<Integer, String>(houseId, debsDataSet);
}
});
}

最后是 Spark 控制台的输出:

part.   Count   input.p cons.p  Time
true 8 6 8 20 ms
true 8 6 8 23 ms
true 8 6 8 7 ms // Too less!!!
true 8 6 8 21 ms
true 8 6 8 13 ms
true 8 6 8 6 ms // Too less!!!
true 8 6 8 5 ms // Too less!!!
true 8 6 8 6 ms // Too less!!!
true 8 6 8 4 ms // Too less!!!
true 8 6 8 7 ms // Too less!!!

最佳答案

我现在找到了一个解决方案:我编写了一个单独的类,它在新进程上调用 spark-submit 命令。这可以在一个循环中完成,因此每个基准测试都在一个新线程中启动,并且每个进程的 sparkContext 也是分开的。所以垃圾收集已经完成,一切正常!

String submitCommand = "/root/spark/bin/spark-submit " + submitParams + " --   class partitioning.PartitionExample /root/partitioning.jar " + javaFlags;
Process p = Runtime.getRuntime().exec(submitCommand);

BufferedReader reader;
String line;

System.out.println(p.waitFor());
reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
while ((line = reader.readLine())!= null) {
System.out.println(line);
}

关于java - 如何测量 Spark 需要在分区 RDD 上运行操作的时间?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30626003/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com