本教程建议这样做:
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))]
但是,在运行时我得到以下输出:
('a', (<pyspark.resultiterable.ResultIterable object at 0x1d8b190>, <pyspark.resultiterable.ResultIterable object at 0x1d8b150>))
('b', (<pyspark.resultiterable.ResultIterable object at 0x1d8b210>, <pyspark.resultiterable.ResultIterable object at 0x1d8b1d0>))
这有 3 层嵌套,如果我将输出存储在“r”中并执行此操作:
for i in r:
for j in i[1]:
print list(j)
我得到了正确的联合分组数字:
1) 为什么 Cogroup 在 PySpark 中不返回像 rightjoin/leftouterjoin 等数字?2) 为什么我不能在我的 PySpark shell 上复制示例?
简单的答案,因为这是 cogroup 应该返回的内容
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
Spark 中的联接实际上是通过 cogroup 实现的,基本上联接只是将可迭代对象从 cogroup 分解为元组。这里是join from spark的植入。
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
}
至于解释器输出的细微差别(请记住输出是相同的,除了 pyspark Iterable 不显示它的内容),除非我看到教程,否则我无法确定。本教程可能会显示更清晰的输出,即使实际上并非如此。还有一件事,我在 scala shell 中运行了一个类似的脚本,它显示了所有输出。
Array((a,(ArrayBuffer(1),ArrayBuffer(2))), (b,(ArrayBuffer(4),ArrayBuffer())))
我是一名优秀的程序员,十分优秀!