gpt4 book ai didi

python - Spark - 嵌套RDD操作

转载 作者:太空狗 更新时间:2023-10-30 02:58:43 26 4
gpt4 key购买 nike

我有两个 RDD 说

   rdd1 = 
id | created | destroyed | price
1 | 1 | 2 | 10
2 | 1 | 5 | 11
3 | 2 | 3 | 11
4 | 3 | 4 | 12
5 | 3 | 5 | 11

rdd2 =

[1,2,3,4,5] # lets call these value as timestamps (ts)

rdd2 基本上是使用 range(intial_value, end_value, interval) 生成的。这里的参数可以变化。大小可以与 rdd1 相同或不同。这个想法是根据 rdd2 的值使用过滤条件将记录从 rdd1 提取到 rdd2(来自 rdd1 的记录可以在提取时重复,正如您在输出中看到的那样)

过滤条件rdd1.created <= ts < rdd1.destroyed)

预期输出:

ts             | prices  
1 | 10,11 # i.e. for ids 1,2 of rdd1
2 | 11,11 # ids 2,3
3 | 11,12,11 # ids 2,4,5
4 | 11,11 # ids 2,5

现在我想根据某些使用 RDD2 键的条件来过滤 RDD1。 (如上所述)并返回将RDD2的键和RDD1的过滤结果连接起来的结果

我也是这样的:

rdd2.map(lambda x : somefilterfunction(x, rdd1))  

def somefilterfunction(x, rdd1):
filtered_rdd1 = rdd1.filter(rdd1[1] <= x).filter(rdd1[2] > x)
prices = filtered_rdd1.map(lambda x : x[3])
res = prices.collect()
return (x, list(res))

然后我得到:

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

我尝试使用 groupBy ,但是因为这里 rdd1 的元素可以一次又一次地重复,而不是分组,我知道分组只会将 rdd1 的每个元素放在某个特定的槽中一次。

现在唯一的方法是使用普通的 for 循环并进行过滤并在最后加入所有内容。

有什么建议吗?

最佳答案

由于您使用常规范围,因此根本没有理由创建第二个 RDD。您可以简单地为每条记录生成特定范围内的值:

from __future__ import division # Required only for Python 2.x
from math import ceil
from itertools import takewhile

rdd1 = sc.parallelize([
(1, 1, 2, 10),
(2, 1, 5, 11),
(3, 2, 3, 11),
(4, 3, 4, 12),
(5, 3, 5, 11),
])


def generate(start, end, step):
def _generate(id, created, destroyed, price):
# Smallest ts >= created
start_for_record = int(ceil((created - start) / step) * step + start)
rng = takewhile(
lambda x: created <= x < destroyed,
xrange(start_for_record, end, step)) # In Python 3.x use range
for i in rng:
yield i, price

return _generate

result = rdd1.flatMap(lambda x: generate(1, 6, 1)(*x)).groupByKey()

结果:

result.mapValues(list).collect()

## [(1, [10, 11]), (2, [11, 11]), (3, [11, 12, 11]), (4, [11, 11])]

关于python - Spark - 嵌套RDD操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33257461/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com