gpt4 book ai didi

java - 在调用 combineByKey 函数生成的 rdd 后,对 collect() 的调用不会返回

转载 作者:行者123 更新时间:2023-11-30 08:31:40 25 4
gpt4 key购买 nike

免责声明:我是 Spark 的新手

我有一个 rdd 看起来像:

[(T,[Tina, Thomas]), (T,[Tolis]), (C,[Cory, Christine]), (J,[Joseph, Jimmy, James, Jackeline, Juan]) , (J,[Jimbo, Jina])]

然后我调用 combineByKey 并得到一个 JavaPairRDD< Character, Integer >

这个调用似乎工作正常(控制流从这一点开始,在调试器中 foo 似乎有某种值(value))

JavaPairRDD<Character, Integer> foo = rdd.combineByKey(createAcc, addAndCount, combine);
System.out.println(foo.collect());

我的问题是程序在调用 foo.collect() 后没有返回;你有什么想法 ?我尝试使用 eclipse 调试器进行调试,但我根本没有机会

我使用的是 Spark 2.0.0 版和 Java 8

编辑:由 combineByKey 调用的函数的代码如下(这显然是一个伪代码,因为我是 spark 的新手,我的目标是调用combineByKey 就是求出每个 Key 所属的字符串列表的总长度):

            Function<Iterable<String>, Integer> createAcc =

new Function<Iterable<String>, Integer>() {

public Integer call(Iterable<String> x) {
int counter = 0;
Iterator<String> it = x.iterator();
while (it.hasNext()) {
counter++;
}
return counter;
}
};

Function2<Integer, Iterable<String>, Integer> addAndCount =

new Function2<Integer,Iterable<String>, Integer>() {

public Integer call(Integer acc , Iterable<String> x) {
int counter = 0;
Iterator<String> it = x.iterator();
while (it.hasNext()) {
counter++;
}
return counter + acc;
}
};

Function2<Integer,Integer,Integer> combine =

new Function2<Integer,Integer, Integer>() {

public Integer call(Integer x, Integer y) {
return x+y;
}
};

UPDATE2:请求的日志如下

16/11/11 17:21:32 信息 SparkContext:开始工作:计数在 Foo.java:265
16/11/11 17:21:32 INFO DAGScheduler:获得作业 9(在 Foo.java 中计数:265),具有 3 个输出分区
16/11/11 17:21:32 信息 DAGScheduler:最后阶段:ResultStage 20(在 Foo.java 处计数:265)
16/11/11 17:21:32 信息 DAGScheduler:最后阶段的 parent :列表(ShuffleMapStage 19,ShuffleMapStage 18)
16/11/11 17:21:32 信息 DAGScheduler:缺少 parent :List()
16/11/11 17:21:32 INFO DAGScheduler:提交 ResultStage 20(MapPartitionsRDD[24] at combineByKey at Foo.java:264),没有丢失的 parent
16/11/11 17:21:32 信息 MemoryStore: block broadcast_12 作为值存储在内存中(估计大小 6.7 KB,可用 1946.0 MB)
16/11/11 17:21:32 INFO MemoryStore: block broadcast_12_piece0 在内存中存储为字节(估计大小 3.4 KB,可用 1946.0 MB)
16/11/11 17:21:32 INFO BlockManagerInfo:在 xxx.xxx.xx.xx:55712 上的内存中添加了 broadcast_12_piece0(大小:3.4 KB,免费:1946.1 MB)
16/11/11 17:21:32 信息 SparkContext:从 DAGScheduler.scala:1012 的广播创建广播 12
16/11/11 17:21:32 INFO DAGScheduler:从 ResultStage 20 提交 3 个缺失的任务(MapPartitionsRDD[24] at combineByKey at Foo.java:264)
16/11/11 17:21:32 信息 TaskSchedulerImpl:添加具有 3 个任务的任务集 20.0
16/11/11 17:21:32 信息 TaskSetManager:在阶段 20.0 中启动任务 0.0(TID 30,本地主机,分区 0,任何,5288 字节)
16/11/11 17:21:32 信息执行者:在阶段 20.0 (TID 30) 中运行任务 0.0
16/11/11 17:21:32 信息 ShuffleBlockFetcherIterator:从 3 个 block 中获取 2 个非空 block
2011 年 16 月 11 日 17:21:32 信息 ShuffleBlockFetcherIterator:在 0 毫秒内开始 0 次远程提取

最佳答案

这是一个简单的 Java 问题:您的“while”循环永远不会调用 it.next,也永远不会结束。

将它们更改为

    while (it.hasNext()) {
it.next();
counter++;
}

关于java - 在调用 combineByKey 函数生成的 rdd 后,对 collect() 的调用不会返回,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40528991/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com