- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一种复杂的需求
1) 1) 对于 Pinterest
twitter handle , pinterest_post , pinterest_likes.
handle "what" , 7
JavaPairRDD<String ,Pinterest> PintRDD
2) Instagram
Twitter handle , instargam_post , instagram_likes
handle "hello" , 10
handle2 "hi" ,20
JavaPairRDD<String ,Pinterest> instRDD
3)本体
twitter handle , categories , sub_categories
handle , Products , MakeUp
handle , Products, MakeUp
handle2 , Services , Face
JavaPairRDD<String ,ontologies1> ontologiesPair
最终输出应该是
对于一个键,如果找到值,则应从相应的对象中打印该值,否则应打印空白值。
编辑 - 根据 Umberto 的代码
public class Combine3PairRDD {
public static void main(String[] args) {
CommonUtils generateSparkContext = new CommonUtils();
JavaSparkContext sc = generateSparkContext.createSparkContext();
JavaPairRDD<String, Pinterest> pintRDD = sc
.parallelizePairs(Arrays.asList(new Tuple2<String, Pinterest>("handle", new Pinterest("what", 7))));
JavaPairRDD<String, Instagram> instRDD = sc
.parallelizePairs(Arrays.asList(new Tuple2<String, Instagram>("handle", new Instagram("hello", 10)),
new Tuple2<String, Instagram>("handle2", new Instagram("Hi", 20))));
JavaPairRDD<String, Ontologies> ontologiesPair = sc.parallelizePairs(
Arrays.asList(new Tuple2<String, Ontologies>("handle", new Ontologies("marketing", "MakeUp")),
new Tuple2<String, Ontologies>("handle2", new Ontologies("Service", "Face")),
new Tuple2<String, Ontologies>("handle", new Ontologies("products", "MakeUp"))));
JavaPairRDD<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> grouped = ontologiesPair
.cogroup(instRDD, pintRDD);
System.out.println("size of cogreop"+grouped.count());
grouped.foreach(new functionn());
JavaPairRDD<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> groupedWithDuplicated = grouped
.flatMapToPair(new PairFlatMapFunction<Tuple2<String,Tuple3<Iterable<Ontologies>,Iterable<Instagram>,Iterable<Pinterest>>>,String,Tuple3<Iterable<Ontologies>,Iterable<Instagram>,Iterable<Pinterest>>>() {
private static final long serialVersionUID = 853578182309543660L;
@Override
public Iterable<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>> call(
Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> entry)
throws Exception {
List<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>> withDuplicate = new ArrayList<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>>();
String key = entry._1();
List<Ontologies>listOntologies = Lists.newArrayList(entry._2()._1());
List<Instagram>listInstagram = Lists.newArrayList(entry._2()._2());
List<Pinterest>listPinterest = Lists.newArrayList(entry._2()._3());
Set<Ontologies> setOntologies = new HashSet<Ontologies>(listOntologies);
Set<Instagram> setInstagram = new HashSet<Instagram>(listInstagram);
Set<Pinterest> setPinterest = new HashSet<Pinterest>(listPinterest);
if(setOntologies.size() < listOntologies.size()){
/* There are duplicates */
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
}
if(setInstagram.size() < listInstagram.size()){
/* There are duplicates */
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
}
if(setPinterest.size() < listPinterest.size()){
/* There are duplicates */
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
}
withDuplicate
.add(new Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>(
key, new Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>(
entry._2()._1(), entry._2()._2(), entry._2()._3())));
return withDuplicate;
}
});
List<Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>>> mapResult2 = groupedWithDuplicated
.collect();
for (Tuple2<String, Tuple3<Iterable<Ontologies>, Iterable<Instagram>, Iterable<Pinterest>>> entry : mapResult2) {
Ontologies ontologies = new Ontologies("", "");
Pinterest pinterest = new Pinterest("", -1);
Instagram instagram = new Instagram("", -1);
if (entry._2()._1().iterator().hasNext()) {
ontologies = entry._2()._1().iterator().next();
}
if (entry._2()._2().iterator().hasNext()) {
instagram = entry._2()._2().iterator().next();
}
if (entry._2()._3().iterator().hasNext()) {
pinterest = entry._2()._3().iterator().next();
}
System.out.println(entry._1() + " " + pinterest.getPinterest_post() + " " + " " + pinterest.getPinterest_likes() + " "
+ instagram.getInstagram_post() + " " + instagram.getInstagram_likes() + " " + ontologies.getCategories() + " "
+ ontologies.getSub_categories());
}
}
}
The three wrappers are as follows
public class Pinterest implements Serializable{
private static final long serialVersionUID = 1226764093455880169L;
public String twitterHandle;
public String pinterest_post ;
public int pinterest_likes;
Pinterest(String pinterest_post,int pinterest_likes){
this.pinterest_post=pinterest_post;
this.pinterest_likes=pinterest_likes;
}
public int hashCode(){
return (int)
pinterest_post.hashCode() *
pinterest_likes;
}
public boolean equals(Object o) {
if(o == null) return false;
Pinterest other = (Pinterest) o;
if(this.twitterHandle != other.twitterHandle) return false;
if(! this.pinterest_post.equals(other.pinterest_post)) return false;
if(this.pinterest_likes != other.pinterest_likes) return false;
return true;
}
}
public class Ontologies implements Serializable{
private static final long serialVersionUID = 1996294848173720136L;
public String twitterHandle;
public String categories ;
public String sub_categories ;
Ontologies(String categories,String sub_categories){
this.categories=categories;
this.sub_categories=sub_categories;
}
public int hashCode(){
return (int)
categories.hashCode() *
sub_categories.hashCode();
}
public boolean equals(Object o) {
if(o == null) return false;
Ontologies other = (Ontologies) o;
if(this.twitterHandle != other.twitterHandle) return false;
if(! this.categories.equals(other.categories)) return false;
if(! this.sub_categories.equals(other.sub_categories)) return false;
return true;
}
}
public class Instagram implements Serializable {
private static final long serialVersionUID = 7351892713578143761L;
public String twitterHandle;
public String instagram_post ;
public int instagram_likes;
Instagram(String instagram_post,int instagram_likes){
this.instagram_post=instagram_post;
this.instagram_likes=instagram_likes;
}
public int hashCode(){
return (int)
instagram_post.hashCode() *
instagram_likes;
}
public boolean equals(Object o) {
if(o == null) return false;
Instagram other = (Instagram) o;
if(this.twitterHandle != other.twitterHandle) return false;
if(! this.instagram_post.equals(other.instagram_post)) return false;
if(this.instagram_likes != other.instagram_likes) return false;
return true;
}
}
当构造函数的值与代码中相同时,上面的代码工作正常
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")),
new Tuple2<String, Ontologies>("handle2", new Ontologies("Service", "Face")),
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")))
它工作正常并且可以打印
handle what 7 hello 10 Products MakeUp
handle what 7 hello 10 Products MakeUp
handle2 -1 Hi 20 Service Face
但是当我更改构造函数时
new Tuple2<String, Ontologies>("handle", new Ontologies("Marketing", "MakeUp"))
new Tuple2<String, Ontologies>("handle2", new Ontologies("Service", "Face")),
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")))
new Tuple2<String, Ontologies>("handle", new Ontologies("Products", "MakeUp")))
我希望打印 key 的 ie 句柄和句柄 2 ie 的两行
handle what 7 hello 10 Marketing MakeUp
handle what 7 hello 10 Products MakeUp
handle what 7 hello 10 Products MakeUp
handle2 -1 Hi 20 Service Face
,我该如何实现
最佳答案
您可以使用 cogroup 方法:
关于java - 如何组合 3 对 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38783654/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!