- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我有一个问题想用 Spark 解决。我是 Spark 的新手,所以我不确定设计它的最佳方式是什么。
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5
我想找到每对用户之间的相互组数。所以对于上面的输入,我期望的输出是:
1st user || 2nd user || mutual/intersection count || union count
------------------------------------------------------------
user1 user2 2 7
user1 user3 1 6
user1 user4 1 9
user2 user4 3 8
我认为有几种方法可以解决这个问题,其中一种解决方案可能是:
例子:
(1st stage): Map
group1=user1,user2 ==>
user1, group1
user2, group1
group2=user1,user2,user3 ==>
user1, group2
user2, group2
user3, group2
....
....
....
(2nd stage): Reduce by key
user1 -> group1, group2, group4, group8
user2 -> group1, group2, group3, group7, group9
但我的问题是,在我按键减少它们之后,以我想要的方式表示计数的最佳方式是什么?
有没有更好的方法来处理这个问题?用户的最大数量是恒定的,不会超过 5000,因此这是它将创建的最大 key 数量。但是输入可能包含接近 1B 行的几行。我不认为这会是一个问题,如果我错了请纠正我。
这是我对Spark一知半解(上个月才开始学Spark)想出来的解决这个问题的代码:
def createPair(line: String): Array[(String, String)] = {
val splits = line.split("=")
val kuid = splits(0)
splits(1).split(",").map { segment => (segment, kuid) }
}
val input = sc.textFile("input/test.log")
val pair = input.flatMap { line => createPair(line) }
val pairListDF = pair
.aggregateByKey(scala.collection.mutable.ListBuffer.empty[String])(
(kuidList, kuid) => { kuidList += kuid; kuidList },
(kuidList1, kuidList2) => { kuidList1.appendAll(kuidList2); kuidList1 })
.mapValues(_.toList).toDF().select($"_1".alias("user"), $"_2".alias("groups"))
pairListDF.registerTempTable("table")
sqlContext.udf.register("intersectCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.intersect(list2).size)
sqlContext.udf.register("unionCount", (list1: WrappedArray[String], list2: WrappedArray[String]) => list1.union(list2).distinct.size)
val populationDF = sqlContext.sql("SELECT t1.user AS user_first,"
+ "t2.user AS user_second,"
+ "intersectCount(t1.groups, t2.groups) AS intersect_count,"
+ "unionCount(t1.groups, t2.groups) AS union_count"
+ " FROM table t1 INNER JOIN table t2"
+ " ON t1.user < t2.user"
+ " ORDER BY user_first,user_second")
+----------+-----------+---------------+-----------+
|user_first|user_second|intersect_count|union_count|
+----------+-----------+---------------+-----------+
| user1| user2| 2| 7|
| user1| user3| 1| 6|
| user1| user4| 1| 9|
| user1| user5| 1| 8|
| user2| user3| 1| 7|
| user2| user4| 3| 8|
| user2| user5| 1| 9|
| user3| user4| 1| 8|
| user3| user5| 2| 6|
| user4| user5| 3| 8|
+----------+-----------+---------------+-----------+
很想得到一些关于我的代码和我遗漏的东西的反馈。请随意批评我的代码,因为我刚刚开始学习 Spark。再次感谢@axiom 的回答,比我预期的更小更好的解决方案。
最佳答案
总结:
获取对数,然后利用事实
union(a, b) = count(a) + count(b) - intersection(a, b)
val data = sc.textFile("test")
//optionally data.cache(), depending on size of data.
val pairCounts = data.flatMap(pairs).reduceByKey(_ + _)
val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
val singleCountMap = sc.broadcast(singleCounts.collectAsMap())
val result = pairCounts.map{case ((user1, user2), intersectionCount) =>(user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)}
总共有 5000 个用户,2500 万个 key (每对 1 个)应该不会太多。我们可以使用 reduceByKey
来计算交点数。
个人计数可以很容易地在 map 中广播
。
现在众所周知:
并集(用户 1,用户 2)= 计数(用户 1)+ 计数(用户 2)- 交集(用户 1,用户 2)
。
前两个计数是从广播映射中读取的,而我们映射的是对计数的 rdd。
代码:
//generate ((user1, user2), 1) for pair counts
def pairs(str: String) = {
val users = str.split("=")(1).split(",")
val n = users.length
for(i <- 0 until n; j <- i + 1 until n) yield {
val pair = if(users(i) < users(j)) {
(users(i), users(j))
} else {
(users(j), users(i))
} //order of the user in a list shouldn't matter
(pair, 1)
}
}
//generate (user, 1), to obtain single counts
def singles(str: String) = {
for(user <- str.split("=")(1).split(",")) yield (user, 1)
}
//read the rdd
scala> val data = sc.textFile("test")
scala> data.collect.map(println)
group1=user1,user2
group2=user1,user2,user3
group3=user2,user4
group4=user1,user4
group5=user3,user5
group6=user3,user4,user5
group7=user2,user4
group8=user1,user5
group9=user2,user4,user5
group10=user4,user5
//get the pair counts
scala> val pairCounts = data.flatMap(pairs).reduceByKey(_ + _)
pairCounts: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[16] at reduceByKey at <console>:25
//just checking
scala> pairCounts.collect.map(println)
((user2,user3),1)
((user1,user3),1)
((user3,user4),1)
((user2,user5),1)
((user1,user5),1)
((user2,user4),3)
((user4,user5),3)
((user1,user4),1)
((user3,user5),2)
((user1,user2),2)
//single counts
scala> val singleCounts = data.flatMap(singles).reduceByKey(_ + _)
singleCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at <console>:25
scala> singleCounts.collect.map(println)
(user5,5)
(user3,3)
(user1,4)
(user2,5)
(user4,6)
//broadcast single counts
scala> val singleCountMap = sc.broadcast(singleCounts.collectAsMap())
//calculate the results:
最后:
scala> val res = pairCounts.map{case ((user1, user2), intersectionCount) => (user1, user2, intersectionCount, singleCountMap.value(user1) + singleCountMap.value(user2) - intersectionCount)}
res: org.apache.spark.rdd.RDD[(String, String, Int, Int)] = MapPartitionsRDD[23] at map at <console>:33
scala> res.collect.map(println)
(user2,user3,1,7)
(user1,user3,1,6)
(user3,user4,1,8)
(user2,user5,1,9)
(user1,user5,1,8)
(user2,user4,3,8)
(user4,user5,3,8)
(user1,user4,1,9)
(user3,user5,2,6)
(user1,user2,2,7)
注意:
在生成对时,我对元组进行排序,因为我们不希望用户在列表中的顺序很重要。
请将用户名字符串编码为整数,您可能会获得显着的性能提升。
关于hadoop - Spark - 寻找重叠值或寻找共同 friend 的变体,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37577830/
我正在创建一个连接到 firebase 的应用程序。但我面临一些问题。当同步我的 gradle 文件时,我收到此警告 WARNING: API 'variant.getMergeResources()
想知道是否有任何方法可以将变体分配给自定义 radio 输入?我想为 2 天、3 天和标准运输设置不同费率的分级运输。我可以使用变体来做到这一点,但下拉菜单对我不起作用。我想要日期信息和日期选择器,以
我是 Haskell 的新手。鉴于 Haskell 的整个前提是函数将始终返回相同的值,我希望有某种方式,例如在编译时计算常量的斐波那契值,就像我可以在 C++ 中使用模板元编程一样,但我不知道该怎么
我是 OCaml 的新手,但过去两天一直在工作,以便更好地了解如何使用它。我最近做了很多事情,但有些事情阻碍了我前进。 我正在尝试在 OCaml 中实现 evaexpr。使用这种语言非常容易,你会说:
我有一个使用一些typedef的std::variant的代码库。 最初,它们是不同的类型,但现在它们像下面的示例一样重叠 typedef int TA; typedef int TB; std::v
鉴于此: data Foo = Bar { name :: String } | Baz { nickname :: String } 函数 name 和 nickname 似乎都是 Foo -> S
嘿,我猜这可能相当微不足道,但我很难找到答案或弄清楚它。 我正在尝试创建一个带有任意间距的彩色方 block 网格。这本身很容易做到,特别是因为我只需要九个正方形。但是虽然我看着我完成的代码,我不禁觉
我有 woocommerce 设置,其中包含产品和这些产品的变体。当我更改变化时(例如,产品的尺寸(340克或900克),我希望它在页面上动态更新价格,以便人们可以看到两种尺寸之间的价格差异。目前,我
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 7 年前。
在我的一节课上,我被问到这是一个脑筋急转弯,但我无法弄明白(这不是家庭作业问题,只是其中一位助教给我们的一个脑筋急转弯让我们思考)。 给你一根杆,上面有 n 个要切割的点,例如 [1,5,11],以及
关于 CRP如果我想实现它的细微变化(使用模板模板参数),我会得到一个编译错误: template class Derived> class Base { public: void Call
我正在创建一个 woocommerce 主题,并且我有产品变体,即显示在产品详细信息页面上的尺寸,但问题是我想通过使用产品 ID 在我的自定义 php 页面中获取所有变体,任何人都可以帮助我。 提前致
我正在使用 Ionic 开发移动应用程序,我必须与 Twitter API 连接。 所以我使用 ng-cordova 和 $cordovaAuth .但是当我这样做时: $cordovaOauth.t
这里的网站有一个音乐播放器http://www.numberonedesigns.com/ubermusic.com/ ...当点击下一个按钮并随机突出显示时,它不会正确随机化。它总是返回到播放列表中
我有列 sql 变体,其含义如下:100, 150, D1我正在尝试根据特定逻辑将列中的所有数字转换为字母(例如 D1)以防万一。但是 150 有空格并且 CASE WHEN 不起作用。 这是我正在使
有没有一种快速方法可以用从匹配模式派生的数据替换所有出现的某些模式? 例如,如果我想将字符串中出现的所有数字替换为用 0 填充到固定长度的相同数字。 在本例中,如果长度为 4,则 ab3cd5 将变为
我目前正在寻找生成具有特定位数的数字列表,我的代码当前如下: | Python 2.7 | import itertools inp = raw_input('Number of digits to
我正在对类型系统进行研究。对于这项工作,我正在研究流行语言中变体、结构子类型、通用多态性和存在多态性的用法。像 heskell、ocaml 这样的功能性语言提供了这样的功能。但我想知道像 C++ 这样
这是 variant.hpp 文件中的相关代码(可在此处找到 http://www.boost.org/doc/libs/1_49_0/boost/variant/variant.hpp) templ
您好,我有两个具有多对多关系的表和一个联结表。简而言之,具有不同属性的产品也随之增加了价格。为了更清楚地说明我是产品和属性表。 +-------------+------------+--------
我是一名优秀的程序员,十分优秀!