gpt4 book ai didi

python - 检查 RDD 中是否存在值

转载 作者:太空宇宙 更新时间:2023-11-04 00:43:32 25 4
gpt4 key购买 nike

我已经用 python 编写了一个运行正常的 Spark 程序。

但是,它在内存消耗方面效率低下,我正在尝试对其进行优化。我在 AWS EMR 上运行它,EMR 因消耗过多内存而终止了工作。

 Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

我认为这个内存问题是由于我在几个实例中收集我的 RDD(即使用 .collect() ),因为在后期阶段,我需要测试由这些组成的列表中是否存在某些值是否为 RDD。

因此,目前我的代码如下所示:

myrdd = data.map(lambda word: (word,1))     \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()

稍后在代码中

if word in myrdd:
mylist.append(word)

myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()

if word in myrdd2:
mylist2.append(word)

然后我多次重复这个模式。

有没有办法进行操作

if word in myrdd: 
do something

不先收集 rdd?

有没有像rdd.contains()这样的函数?

P.S:我没有在内存中缓存任何东西。我的 Spark 上下文如下所示:

jobName = "wordcount"
sc = SparkContext(appName = jobName)

......
......

sc.stop()

最佳答案

来自 YARN 的错误消息说 collect 不是问题,因为您的执行程序(而不是驱动程序)有内存问题。

首先,尝试遵循错误消息建议并提升 spark.yarn.executor.memoryOverhead - 在 YARN 上运行 pyspark 时,您可以告诉 YARN 为 python 工作进程内存分配更大的容器.

接下来,查看执行程序需要大量内存的操作。您使用 reduceByKey,也许您可​​以增加分区的数量,使它们在使用的内存方面更小。查看 numPartitions 参数:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey

最后,如果你想检查 rdd 是否包含某个值,那么只需按该值过滤并使用 countfirst 检查它,例如:

looking_for = "....."
contains = rdd.filter(lambda a: a == looking_for).count() > 0

关于python - 检查 RDD 中是否存在值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40797893/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com