gpt4 book ai didi

java - 使用mapPartitionsToPair/PairFlatMapFunction时如何返回迭代器

转载 作者:行者123 更新时间:2023-12-01 20:26:54 25 4
gpt4 key购买 nike

在spark中使用mapPartitionsToPair/PairFlatMapFunction时,我在互联网上找到了一个例子

spark.read ().textFile (hdfsPath).javaRDD ()
.mapPartitionsToPair (new PairFlatMapFunction <Iterator <String>, String, String> () {
public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result;
}
});

但是编译时出现错误

return type Iterable<Tuple2<String,String>> is not compatible with Iterator<Tuple2<String,String>>

我发现了call的声明

java.util.Iterator<scala.Tuple2<K,V>> call(T t) 

所以调用应该返回一个迭代器。

因此有人可以帮助我如何在 javaRDD api 中返回 Spark 上的迭代器吗?非常感谢

PS:我尝试了如下代码,但在集群上不起作用:

public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result.iterator;
}

最佳答案

您的开发环境和集群之间的 Spark 版本似乎不匹配。

从Spark-2.0.0开始,Java RDD的flatMap和mapPartitions函数返回Java迭代器而不是可迭代的。

因此,如果您的集群低于 Spark-2.0.0,那么在开发时也请使用相同的 Spark 版本。

对于 Spark-2.0.0 或更高版本

public Iterator <Tuple2 <String, Strng> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result.iterator;
}

应该可以。

对于低于 2.0.0 的 Spark 版本

public Iterable <Tuple2 <String, String> > call (Iterator <String> input) {
List <String> result = new ArrayList <String> ();
while (input.hasNext ()) result.add (doSomeThing (input.next ()));
return result;
}

应该可以。

关于java - 使用mapPartitionsToPair/PairFlatMapFunction时如何返回迭代器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43739172/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com