gpt4 book ai didi

scala - 在 map 中访问另一个 rdd

转载 作者:行者123 更新时间:2023-12-04 17:55:14 25 4
gpt4 key购买 nike

这是我的示例数据:

| rdd1  |
| .... |
| 10 |
| 200 |
| 350 |
| 400 |
| 1000 |
| 1500 |
| ..... |



| rdd2 |
| label | features |
| .... | ....................... |
| 0 | 1 10 30 100 200 450 600 |
| 0 | 200 300 400 |
| 1 | 200 350 450 |
| 1 | 400 600 700 |
| .... | ........................ |

I want to compute the following: For each element of rdd1 find out how many times it appears in the features in rdd2 for each label value. I need a tuple like this (#of times appears with lable 0, # times appears with label 1) So in the above example, 10 appears 1 time with label 0 and 0 times with label 1 for 10 it will be (1,0). 200 appears 2 times with label 0 and one time with label 1 so it will be (2,1) for 200.

In addition, I also want to find out For each element of rdd1 find out how many times it does not appear in the features in rdd2 for each label value. I need a tuple like this (#of times does not appear with lable 0, # times does not appear with label 1). So in the above example, for 10 I should get back it does not appear one time with label and two times with label 1 (1,2).

我计划使用按键聚合。

val initialCount : collection.mutable.ListBuffer[Int] = ListBuffer(0, 0)
val addToCounts = (s: collection.mutable.ListBuffer[Int], label:Int) => if (label == 1) s(0) += 1 else s(1) += 1
val sumPartitionCounts = (p1: collection.mutable.ListBuffer[Int], p2: collection.mutable.ListBuffer[Int]) => ListBuffer((p1(0) + p2(0)),(p1(1) + p2(1)))

但是,我读到在另一个 rdd 的 map 函数中访问一个 rdd 是不允许的。关于如何解决这个问题的任何想法都会很棒。

最佳答案

  1. 广播变量 - 如果您的 rdd2 足够小,将其广播到每个节点并将其用作 rdd1.map 中的查找或
  2. 加入 - 加入键值 rdds

您必须重组您的 rdd2 以获得广播 var 查找或加入所需的 key 。如果 rdd2 是 RDD[label, Array(feature)],我会尝试像这样获取 RDD[feature,label]:

    val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.map(y => (y,x._1)))        

然后使用 aggregateByKey 创建 RDD[feature, Map[label, frequency]]

    val initialMap = scala.collection.mutable.Map.empty[String, Int]
val addToMap = (x: scala.collection.mutable.Map[String, Int], y: String) => {
if(x.contains(y))
x += ((y, x.get(y).get+1))
else
x += ((y, 1))
}
val mergeMaps = (x: scala.collection.mutable.Map[String, Int], y: scala.collection.mutable.Map[String, Int]) => {
x ++= y
}
val rdd2Aggregated: RDD[String, scala.collection.mutable.Map[String,Int] =
rdd2Mapped.aggregateByKey(initialMap)(addToMap, mergeMaps)

现在,广播 rdd2Aggregated 或将 rdd1 与 rdd2Aggregated 连接起来,并使用 Map[label->frequency] 获得您想要的结果。

对于问题的第二部分,以几乎类似的方式转换 rdd2,但只对每个标签采用不同的特征

    val rdd2Mapped: RDD[String,String] = rdd2.flatMap(x => x._2.distinct.map(y => (y,x._1)))

像第一部分一样获取 RDD[feature, Map[label, frequency]]。这将为您提供某个功能在 rdd2 中出现的次数。现在,得到没有。 rdd2 中每个标签的行数(rdd2 中标签的简单字数统计)。您像以前一样将 rdd1 与这个新的 rdd2Aggregated 连接起来,并进一步将生成的 rdd 与 wordcount 查找映射连接起来(如果足够小,则广播 wordcount 查找映射)。现在,对于每个特征,您都会得到一张标签和频率图。从查找图中相应的标签计数中减去每个标签的频率,以获得所需的答案。

如果给定特征的 Map[label,frequency] 中不存在标签,则将该频率视为 0。确保考虑这种边缘情况。

关于scala - 在 map 中访问另一个 rdd,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35469857/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com