gpt4 book ai didi

apache-spark - Apache Spark : Unexpected filter results

转载 作者:行者123 更新时间:2023-12-04 04:48:55 26 4
gpt4 key购买 nike

我在本地模式下使用 Apache Spark v 1.2。我创建了一个 RDD 并将其保存在内存中。 Spark Web UI 显示该 RDD 的 85% 存储在内存中。我在 RDD 中有一个值为 0,1 的特征/变量,正如我通过运行以下脚本得到的结果所示:

In[96]: flagged.map(lambda x:(x[14],1)).reduceByKey(lambda x,y:x+y).collect()

Out[96]: [(0, 637981), (1, 272958)]

此外,当我执行 flagged.count() 时,数字是两个值的总和,即 637981+272958 = 910939

现在当我基于此运行过滤器时,我没有得到相同的计数:

In[97]:  flagged.filter(lambda x: x[14]==0).count()

Out[97]: 637344

In[97]: flagged.filter(lambda x: x[14]==1).count()

Out[97]: 272988

我很难理解为什么过滤后的 RDD 生成的数字与 reduceByKey 方法生成的数字不匹配。

最佳答案

使用缓存类型MEMORY_AND_DISK

rdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)

有一刻我觉得这是一个错误,我执行了一个示例作业,看起来你是对的

  val count3 = sc.parallelize(1 to 1000000).map(r => {
(new java.util.Random().nextInt(2), 1)
})


count3.reduceByKey(_+_).collect

res10: Array[(Int, Int)] = Array((0,500201), (1,499799))

 count3.filter(r => r._1==0).count

res13: 长 = 499613

 count3.filter(r => r._1==1).count

res14: 长 = 500143

但后来我将代码更改为

 val count3 = sc.parallelize(1 to 1000000).map(r => {
(new java.util.Random().nextInt(2), 1)
}).persist()
count3.count

请注意,这次我添加了 persist(并且我能够缓存 100% 的这个 rdd)

count3.reduceByKey(_+_).collect

res27: Array[(Int, Int)] = Array((0,500048), (1,499952))

 count3.filter(r => r._1==0).count

res28: 长 = 500048

 count3.filter(r => r._1==1).count

res29: 长 = 499952

我认为您正在生成 RDD 然后持久化它,默认缓存类型是 MEMORY_ONLY。现在的问题是您只能在内存中缓存 rdd 的 85%,这意味着剩余的 15% 将根据需要重新计算。如果您在创建 rdd 时使用了一些随机函数,这 15% 的数据可能会在重新计算期间发生变化。

关于apache-spark - Apache Spark : Unexpected filter results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30020906/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com