gpt4 book ai didi

java - Spark java.lang.StackOverflowError

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:15:06 25 4
gpt4 key购买 nike

我使用 spark 来计算用户评论的 pagerank,但是当我在大数据集(40k 条目)上运行我的代码时,我不断收到 Spark java.lang.StackOverflowError。虽然在少量条目上运行代码时它工作正常。

条目示例:

product/productId: B00004CK40   review/userId: A39IIHQF18YGZA   review/profileName: C. A. M. Salas  review/helpfulness: 0/0 review/score: 4.0   review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.

代码:

public void calculatePageRank() {
sc.clearCallSite();
sc.clearJobGroup();

JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
sc.setCheckpointDir("pagerankCheckpoint/");

JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {

@Override
public String call(String arg0) throws Exception {
String[] data = arg0.split("\t");
String movieId = data[0].split(":")[1].trim();
String userId = data[1].split(":")[1].trim();
return movieId + "\t" + userId;
}
});

JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {

@Override
public Tuple2 < String, String > call(String arg0) throws Exception {
String[] data = arg0.split("\t");
return new Tuple2 < String, String > (data[0], data[1]);
}
}).groupByKey().cache();


JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
List<Iterable<String>> cartUsersList = cartUsers.collect();
JavaPairRDD<String,String> finalCartesian = null;
int iterCounter = 0;
for(Iterable<String> out : cartUsersList){
JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
if(finalCartesian==null){
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
}
else{
finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
if(iterCounter % 20 == 0) {
finalCartesian.checkpoint();
}
}
}
JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));

finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));

JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {

//Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
@Override
public String call (Tuple2<String, String> t) throws Exception {
return t._1 + " " + t._2;
}
});

try {

//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
JavaPageRank.calculatePageRank(userIdPairsString, 100);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

sc.close();

}

最佳答案

我有多个建议可以帮助您大大提高问题中代码的性能。

  1. 缓存:缓存应该用于那些您需要为相同/不同操作(迭代算法)反复引用的数据集。

An example is RDD.count — to tell you the number of lines in the file, the file needs to be read. So if you write RDD.count, at this point the file will be read, the lines will be counted, and the count will be returned.

What if you call RDD.count again? The same thing: the file will be read and counted again. So what does RDD.cache do? Now, if you run RDD.count the first time, the file will be loaded, cached, and counted. If you call RDD.count a second time, the operation will use the cache. It will just take the data from the cache and count the lines, no recomputing.

阅读有关缓存的更多信息 here .

在您的代码示例中,您没有重用任何已缓存的内容。所以你可以从那里删除 .cache

  1. 并行化:在代码示例中,您已经并行化了 RDD 中已经是分布式集合的每个单独元素。我建议您合并 rddFileDatarddMovieDatarddPairReviewData 步骤,以便一次性完成。

摆脱 .collect,因为这会将结果返回给驱动程序,并且可能是错误的实际原因。

关于java - Spark java.lang.StackOverflowError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37909444/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com