- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是我的示例数据:
| rdd1 |
| .... |
| 10 |
| 200 |
| 350 |
| 400 |
| 1000 |
| 1500 |
| ..... |
| rdd2 |
| label | features |
| .... | ....................... |
| 0 | 1 10 30 100 200 450 600 |
| 0 | 200 300 400 |
| 1 | 200 350 450 |
| 1 | 400 600 700 |
| .... | ........................ |
I want to compute the following: For each element of rdd1 find out how many times it appears in the features in rdd2 for each label value. I need a tuple like this (#of times appears with lable 0, # times appears with label 1) So in the above example, 10 appears 1 time with label 0 and 0 times with label 1 for 10 it will be (1,0). 200 appears 2 times with label 0 and one time with label 1 so it will be (2,1) for 200.
In addition, I also want to find out For each element of rdd1 find out how many times it does not appear in the features in rdd2 for each label value. I need a tuple like this (#of times does not appear with lable 0, # times does not appear with label 1). So in the above example, for 10 I should get back it does not appear one time with label and two times with label 1 (1,2).
我计划使用按键聚合。
val initialCount : collection.mutable.ListBuffer[Int] = ListBuffer(0, 0)
val addToCounts = (s: collection.mutable.ListBuffer[Int], label:Int) => if (label == 1) s(0) += 1 else s(1) += 1
val sumPartitionCounts = (p1: collection.mutable.ListBuffer[Int], p2: collection.mutable.ListBuffer[Int]) => ListBuffer((p1(0) + p2(0)),(p1(1) + p2(1)))
但是,我读到在另一个 rdd 的 map 函数中访问一个 rdd 是不允许的。关于如何解决这个问题的任何想法都会很棒。
最佳答案
您必须重组您的 rdd2 以获得广播 var 查找或加入所需的 key 。如果 rdd2 是 RDD[label, Array(feature)],我会尝试像这样获取 RDD[feature,label]:
val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.map(y => (y,x._1)))
然后使用 aggregateByKey 创建 RDD[feature, Map[label, frequency]]
val initialMap = scala.collection.mutable.Map.empty[String, Int]
val addToMap = (x: scala.collection.mutable.Map[String, Int], y: String) => {
if(x.contains(y))
x += ((y, x.get(y).get+1))
else
x += ((y, 1))
}
val mergeMaps = (x: scala.collection.mutable.Map[String, Int], y: scala.collection.mutable.Map[String, Int]) => {
x ++= y
}
val rdd2Aggregated: RDD[String, scala.collection.mutable.Map[String,Int] =
rdd2Mapped.aggregateByKey(initialMap)(addToMap, mergeMaps)
现在,广播 rdd2Aggregated 或将 rdd1 与 rdd2Aggregated 连接起来,并使用 Map[label->frequency] 获得您想要的结果。
对于问题的第二部分,以几乎类似的方式转换 rdd2,但只对每个标签采用不同的特征
val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.distinct.map(y => (y,x._1)))
像第一部分一样获取 RDD[feature, Map[label, frequency]]。这将为您提供某个功能在 rdd2 中出现的次数。现在,得到没有。 rdd2 中每个标签的行数(rdd2 中标签的简单字数统计)。您像以前一样将 rdd1 与这个新的 rdd2Aggregated 连接起来,并进一步将生成的 rdd 与 wordcount 查找映射连接起来(如果足够小,则广播 wordcount 查找映射)。现在,对于每个特征,您都会得到一张标签和频率图。从查找图中相应的标签计数中减去每个标签的频率,以获得所需的答案。
如果给定特征的 Map[label,frequency] 中不存在标签,则将该频率视为 0。确保考虑这种边缘情况。
关于scala - 在 map 中访问另一个 rdd,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35469857/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!