- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我在 Python 中遇到关于 Apache Spark 的问题。我有这套
data = sc.parallelize([('a','u'), ('a', 'v'),
('b', 'w'), ('b', 'x'), ('b', 'x')] )
我想做的是按键计算元素的数量并创建一个包含元素的列表。如果我这样做
a = data.combineByKey(lambda value: (value, 1),
lambda x, value: (value, x[1] + 1),
lambda x, y: (x[0]+'/'+y[0], x[1] + y[1]))
我有这样的结果:
[('a', ('u/v', 2)), ('b', ('w/x/x', 3))]
我想要的是
[('a', (['u','v'], 2)), ('b', (['w','x','x'], 3))]
我该怎么做?
最佳答案
如果您想将所有值保存为列表,则根本没有理由使用 combineByKey
。简单地 groupBy
更有效:
aggregated = data.groupByKey().mapValues(lambda vs: (list(vs), len(vs)))
aggregated.collect()
## [('a', (['u', 'v'], 2)), ('b', (['w', 'x', 'x'], 3))]
一种更有效的方法是保留计数而不是所有值:
aggregated_counts = (data
.map(lambda kv: (kv, 1))
.reduceByKey(add)
.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))
.groupByKey()
.mapValues(lambda xs: (list(xs), sum(x[1] for x in xs))))
aggregated_counts.collect()
## [('a', ([('v', 1), ('u', 1)], 2)), ('b', ([('w', 1), ('x', 2)], 3))]
或
from collections import Counter
def merge_value(acc, x):
acc.update(x)
return acc
def merge_combiners(acc1, acc2):
acc1.update(acc2)
return acc1
aggregated_counts_ = (data
.combineByKey(Counter, merge_value, merge_combiners)
.mapValues(lambda cnt: (cnt, sum(cnt.values()))))
aggregated_counts_.collect()
## [('a', (Counter({'u': 1, 'v': 1}), 2)), ('b', (Counter({'w': 1, 'x': 2}), 3))]
关于python - Apache Spark CombineByKey 与 Python 中的元素列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33878579/
我正在学习spark,但是我无法理解combineByKey这个函数。 >>> data = sc.parallelize([("A",1),("A",2),("B",1),("B",2),("C",
我是 Apache spark 的新手,所以这个问题可能不好问,但我不知道 combinebykey 和 aggregatebykey 之间的区别以及何时使用哪个操作。 最佳答案 aggregateB
我想使用 lambda 函数来计算 a ( JavaPairRDD pairs) 的按键平均值。为此,我开发了以下代码: java.util.function.Function> createAcc
这个问题在这里已经有了答案: How createCombiner,mergeValue, mergeCombiner works in CombineByKey in Spark ( Using
我在 Python 中遇到关于 Apache Spark 的问题。我有这套 data = sc.parallelize([('a','u'), ('a', 'v'), ('b', 'w'),
我已经通过 Kafka 将数据按键排序到我的 Spark Streaming 分区中,即在一个节点上找到的键在任何其他节点上都找不到。 我想使用 redis 及其 incrby(递增方式)命令作为状态
免责声明:我是 Spark 的新手 我有一个 rdd 看起来像: [(T,[Tina, Thomas]), (T,[Tolis]), (C,[Cory, Christine]), (J,[Joseph
我是 spark rdd 的新手,我想使用 spark 混洗操作通过使用键对聚合进行分组来计算聚合。起初我的方法是使用 rdd.groupby() 但是在执行它时它需要更长的时间来收敛并且内存效率很低
我刚刚开始使用 Java 中的 Apache Spark。我目前正在做一个带有一些书籍数据的小型项目。我必须找到每个国家/地区最受欢迎的作者。 我有一个pairRDD,其中键是国家/地区,值是作者,如
我是一名优秀的程序员,十分优秀!