gpt4 book ai didi

java - 当多个线程共享相同的 Spark 上下文时,Spark 应用程序不会停止

转载 作者:太空宇宙 更新时间:2023-11-04 11:54:13 25 4
gpt4 key购买 nike

我尝试重现我所面临的问题。我的问题陈述-文件夹中存在多个文件。我需要对每个文件进行字数统计并打印结果。每个文件应该并行处理!当然,并行性是有限制的。我编写了以下代码来完成它。它运行良好。集群正在安装 mapR 的 Spark。集群有spark.scheduler.mode = FIFO。

Q1- is there a better way to accomplish the task mentioned above?

Q2- i have observed that the application does not stop even when it has completed the word counting of avaialble files. i am unable to figure out how to deal with it?

package groupId.artifactId;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class Executor {

/**
* @param args
*/
public static void main(String[] args) {
final int threadPoolSize = 5;
SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("Tracker").set("spark.ui.port","0");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
List<Future> listOfFuture = new ArrayList<Future>();
for (int i = 0; i < 20; i++) {
if (listOfFuture.size() < threadPoolSize) {
FlexiWordCount flexiWordCount = new FlexiWordCount(jsc, i);
Future future = executor.submit(flexiWordCount);
listOfFuture.add(future);
} else {
boolean allFutureDone = false;
while (!allFutureDone) {
allFutureDone = checkForAllFuture(listOfFuture);
System.out.println("Threads not completed yet!");
try {
Thread.sleep(2000);//waiting for 2 sec, before next check
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
printFutureResult(listOfFuture);
System.out.println("printing of future done");
listOfFuture.clear();
System.out.println("future list got cleared");
}

}
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}



private static void printFutureResult(List<Future> listOfFuture) {
Iterator<Future> iterateFuture = listOfFuture.iterator();
while (iterateFuture.hasNext()) {
Future tempFuture = iterateFuture.next();
try {
System.out.println("Future result " + tempFuture.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private static boolean checkForAllFuture(List<Future> listOfFuture) {
boolean status = true;
Iterator<Future> iterateFuture = listOfFuture.iterator();
while (iterateFuture.hasNext()) {
Future tempFuture = iterateFuture.next();
if (!tempFuture.isDone()) {
status = false;
break;
}
}
return status;

}

package groupId.artifactId;

import java.io.Serializable;
import java.util.Arrays;
import java.util.concurrent.Callable;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class FlexiWordCount implements Callable<Object>,Serializable {


private static final long serialVersionUID = 1L;
private JavaSparkContext jsc;
private int fileId;

public FlexiWordCount(JavaSparkContext jsc, int fileId) {
super();
this.jsc = jsc;
this.fileId = fileId;
}
private static class Reduction implements Function2<Integer, Integer, Integer>{
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}

private static class KVPair implements PairFunction<String, String, Integer>{
@Override
public Tuple2<String, Integer> call(String paramT)
throws Exception {
return new Tuple2<String, Integer>(paramT, 1);
}
}
private static class Flatter implements FlatMapFunction<String, String>{

@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
}
@Override
public Object call() throws Exception {
JavaRDD<String> jrd = jsc.textFile("/root/folder/experiment979/" + fileId +".txt");
System.out.println("inside call() for fileId = " + fileId);
JavaRDD<String> words = jrd.flatMap(new Flatter());
JavaPairRDD<String, Integer> ones = words.mapToPair(new KVPair());
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Reduction());
return counts.collect();
}
}
}

最佳答案

为什么程序没有自动关闭?

Ans:您还没有关闭 Sparkcontex ,尝试将 main 方法更改为:

public static void main(String[] args) {    
final int threadPoolSize = 5;
SparkConf sparkConf = new SparkConf().setMaster("yarn-client").setAppName("Tracker").set("spark.ui.port","0");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
List<Future> listOfFuture = new ArrayList<Future>();
for (int i = 0; i < 20; i++) {
if (listOfFuture.size() < threadPoolSize) {
FlexiWordCount flexiWordCount = new FlexiWordCount(jsc, i);
Future future = executor.submit(flexiWordCount);
listOfFuture.add(future);
} else {
boolean allFutureDone = false;
while (!allFutureDone) {
allFutureDone = checkForAllFuture(listOfFuture);
System.out.println("Threads not completed yet!");
try {
Thread.sleep(2000);//waiting for 2 sec, before next check
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
printFutureResult(listOfFuture);
System.out.println("printing of future done");
listOfFuture.clear();
System.out.println("future list got cleared");
}

}
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jsc.stop()
}

有更好的方法吗?

Ans:是的,您应该将文件的目录传递给sparkcontext,并在目录上使用.textFile,在这种情况下,spark 会并行化执行器上目录的读取。如果您尝试自己创建线程,然后使用相同的 Spark 上下文为每个文件重新提交作业,则会增加将应用程序提交到 yarn 队列的额外开销。

我认为最快的方法是直接传递整个目录并从中创建 RDD,然后让 Spark 启动并行任务来处理不同执行器中的所有文件。您可以尝试在 RDD 上使用 .repartition() 方法,因为它会启动很多任务来并行运行。

关于java - 当多个线程共享相同的 Spark 上下文时,Spark 应用程序不会停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41501442/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com