gpt4 book ai didi

java - 如何组合 3 对 RDD

转载 作者:太空宇宙 更新时间:2023-11-04 12:21:52 25 4
gpt4 key购买 nike

我有一种复杂的需求

1) 1) 对于 Pinterest

twitter handle , pinterest_post , pinterest_likes.

handle "what" , 7



JavaPairRDD<String ,Pinterest> PintRDD

2) Instagram

Twitter handle , instargam_post , instagram_likes 

handle "hello" , 10
handle2 "hi" ,20


JavaPairRDD<String ,Pinterest> instRDD

3)本体

twitter handle , categories , sub_categories 

handle , Products , MakeUp
handle , Products, MakeUp
handle2 , Services , Face

JavaPairRDD<String ,ontologies1> ontologiesPair

最终输出应该是

对于一个键,如果找到值,则应从相应的对象中打印该值,否则应打印空白值。

编辑 - 根据 Umberto 的代码

public class Combine3PairRDD {



public static void main(String[] args) {

CommonUtils generateSparkContext = new CommonUtils();
JavaSparkContext sc = generateSparkContext.createSparkContext();

JavaPairRDD<String, Pinterest> pintRDD = sc
.parallelizePairs(Arrays.asList(new Tuple2<String, Pinterest>("handle", new Pinterest("what", 7))));

JavaPairRDD<String, Instagram> instRDD = sc
.parallelizePairs(Arrays.asList(new Tuple2<String, Instagram>("handle", new Instagram("hello", 10)),
new Tuple2<String, Instagram>("handle2", new Instagram("Hi", 20))));

JavaPairRDD<String, Ontologies> ontologiesPair = sc.parallelizePairs(
Arrays.asList(new Tuple2<String, Ontologies>("handle", new Ontologies("marketing", "MakeUp")),
new Tuple2<String, Ontologies>("handle2", new Ontologies("Service", "Face")),
new Tuple2<String, Ontologies>("handle", new Ontologies("products", "MakeUp"))));



JavaPairRDD<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> grouped = ontologiesPair
.cogroup(instRDD, pintRDD);

System.out.println("size of cogreop"+grouped.count());

grouped.foreach(new functionn());



JavaPairRDD<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> groupedWithDuplicated = grouped
.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Tuple3<Iterable<Ontologies>,Iterable<Instagram>,Iterable<Pinterest>>>,String,Tuple3<Iterable<Ontologies>,Iterable<Instagram>,Iterable<Pinterest>>>() {

private static final long serialVersionUID = 853578182309543660L;

@Override
public Iterable<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>> call(
Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> entry)
throws Exception {
List<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>> withDuplicate = new ArrayList<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>>();

String key = entry._1();


List<Ontologies>listOntologies = Lists.newArrayList(entry._2()._1());

List<Instagram>listInstagram = Lists.newArrayList(entry._2()._2());
List<Pinterest>listPinterest = Lists.newArrayList(entry._2()._3());

Set<Ontologies> setOntologies = new HashSet<Ontologies>(listOntologies);

Set<Instagram> setInstagram = new HashSet<Instagram>(listInstagram);
Set<Pinterest> setPinterest = new HashSet<Pinterest>(listPinterest);




if(setOntologies.size() < listOntologies.size()){
/* There are duplicates */
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
}

if(setInstagram.size() < listInstagram.size()){
/* There are duplicates */
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
}

if(setPinterest.size() < listPinterest.size()){
/* There are duplicates */
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
}

withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));

return withDuplicate;
}

});



List<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>> mapResult2 = groupedWithDuplicated
.collect();
for (Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> entry : mapResult2) {
Ontologies ontologies = new Ontologies("", "");
Pinterest pinterest = new Pinterest("", -1);
Instagram instagram = new Instagram("", -1);



if (entry._2()._1().iterator().hasNext()) {
ontologies = entry._2()._1().iterator().next();
}

if (entry._2()._2().iterator().hasNext()) {
instagram = entry._2()._2().iterator().next();
}

if (entry._2()._3().iterator().hasNext()) {
pinterest = entry._2()._3().iterator().next();
}



System.out.println(entry._1() + " " + pinterest.getPinterest_post() + " " + " " + pinterest.getPinterest_likes() + " "
+ instagram.getInstagram_post() + " " + instagram.getInstagram_likes() + " " + ontologies.getCategories() + " "
+ ontologies.getSub_categories());



}
}
}


The three wrappers are as follows



public class Pinterest implements Serializable{

private static final long serialVersionUID = 1226764093455880169L;
public String twitterHandle;
public String pinterest_post ;
public int pinterest_likes;

Pinterest(String pinterest_post,int pinterest_likes){

this.pinterest_post=pinterest_post;
this.pinterest_likes=pinterest_likes;
}
public int hashCode(){
return (int)
pinterest_post.hashCode() *
pinterest_likes;
}

public boolean equals(Object o) {
if(o == null) return false;
Pinterest other = (Pinterest) o;
if(this.twitterHandle != other.twitterHandle) return false;
if(! this.pinterest_post.equals(other.pinterest_post)) return false;
if(this.pinterest_likes != other.pinterest_likes) return false;

return true;
}

}



public class Ontologies implements Serializable{

private static final long serialVersionUID = 1996294848173720136L;
public String twitterHandle;
public String categories ;
public String sub_categories ;

Ontologies(String categories,String sub_categories){

this.categories=categories;
this.sub_categories=sub_categories;
}
public int hashCode(){
return (int)

categories.hashCode() *
sub_categories.hashCode();
}

public boolean equals(Object o) {
if(o == null) return false;
Ontologies other = (Ontologies) o;
if(this.twitterHandle != other.twitterHandle) return false;
if(! this.categories.equals(other.categories)) return false;
if(! this.sub_categories.equals(other.sub_categories)) return false;

return true;
}

}

public class Instagram implements Serializable {

private static final long serialVersionUID = 7351892713578143761L;
public String twitterHandle;
public String instagram_post ;
public int instagram_likes;

Instagram(String instagram_post,int instagram_likes){

this.instagram_post=instagram_post;
this.instagram_likes=instagram_likes;
}
public int hashCode(){
return (int)
instagram_post.hashCode() *
instagram_likes;
}
public boolean equals(Object o) {
if(o == null) return false;
Instagram other = (Instagram) o;
if(this.twitterHandle != other.twitterHandle) return false;
if(! this.instagram_post.equals(other.instagram_post)) return false;
if(this.instagram_likes != other.instagram_likes) return false;

return true;
}


}

当构造函数的值与代码中相同时,上面的代码工作正常

new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")),
new Tuple2<String, Ontologies>("handle2", new Ontologies("Service", "Face")),
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")))

它工作正常并且可以打印

handle what  7 hello 10 Products MakeUp
handle what 7 hello 10 Products MakeUp
handle2 -1 Hi 20 Service Face

但是当我更改构造函数时

 new Tuple2<String, Ontologies>("handle", new Ontologies("Marketing", "MakeUp"))
new Tuple2<String, Ontologies>("handle2", new Ontologies("Service", "Face")),
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")))
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")))

我希望打印 key 的 ie 句柄和句柄 2 ie 的两行

handle what  7 hello 10 Marketing MakeUp
handle what 7 hello 10 Products MakeUp
handle what 7 hello 10 Products MakeUp
handle2 -1 Hi 20 Service Face

,我该如何实现

最佳答案

关于java - 如何组合 3 对 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38783654/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com