>> y = sc.parallelize([("a", 2)]) >>> sorted-6ren">
gpt4 book ai didi

python - PySpark 中的协作组

转载 作者:太空宇宙 更新时间:2023-11-03 11:51:56 26 4
gpt4 key购买 nike

本教程建议这样做:

>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> sorted(x.cogroup(y).collect())
[('a', ([1], [2])), ('b', ([4], []))]

但是,在运行时我得到以下输出:

('a', (<pyspark.resultiterable.ResultIterable object at 0x1d8b190>, <pyspark.resultiterable.ResultIterable object at 0x1d8b150>))
('b', (<pyspark.resultiterable.ResultIterable object at 0x1d8b210>, <pyspark.resultiterable.ResultIterable object at 0x1d8b1d0>))

这有 3 层嵌套,如果我将输出存储在“r”中并执行此操作:

for i in r:
for j in i[1]:
print list(j)

我得到了正确的联合分组数字:

1) 为什么 Cogroup 在 PySpark 中不返回像 rightjoin/leftouterjoin 等数字?2) 为什么我不能在我的 PySpark shell 上复制示例?

最佳答案

简单的答案,因为这是 cogroup 应该返回的内容

  /**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

Spark 中的联接实际上是通过 cogroup 实现的,基本上联接只是将可迭代对象从 cogroup 分解为元组。这里是join from spark的植入。

  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
for (v <- vs; w <- ws) yield (v, w)
}
}

至于解释器输出的细微差别(请记住输出是相同的,除了 pyspark Iterable 不显示它的内容),除非我看到教程,否则我无法确定。本教程可能会显示更清晰的输出,即使实际上并非如此。还有一件事,我在 scala shell 中运行了一个类似的脚本,它显示了所有输出。

Array((a,(ArrayBuffer(1),ArrayBuffer(2))), (b,(ArrayBuffer(4),ArrayBuffer())))

关于python - PySpark 中的协作组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24398938/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com