gpt4 book ai didi

hadoop - 加入 Spark 输出错误的结果,而 map-side join 是正确的

转载 作者:可可西里 更新时间:2023-11-01 16:53:02 32 4
gpt4 key购买 nike

我的 spark 版本是 1.2.0,场景是这样的:

有两个RDD,分别是RDD_A和RDD_B,其数据结构都是RDD[(spid, the_same_spid)]。 RDD_A 有 20,000 行,而 RDD_B 有 3,000,000,000 行。我打算计算其“spid”存在于 RDD_A 中的 RDD_B 的行数。

我的第一个实现相当主流,在 RDD_A 上应用 RDD_B 的 join 方法:

val currentDay = args(0)

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

//---join---
val filteredTongYuanRdd = tongYuanRdd.join(spidRdds);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

但是,与配置单元的结果相比,结果不正确(大于)。将 join 方法从 reduce-side join 更改为 map-side join 如下所示,结果与配置单元的结果相同:

val conf = new SparkConf().setAppName("Spark-MonitorPlus-LogStatistic")
val sc = new SparkContext(conf)

//---RDD A transforming to RDD[(spid, spid)]---
val spidRdds = sc.textFile("/diablo/task/spid-date/" + currentDay + "-spid-media").map(line =>
line.split(",")(0).trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));
val logRdds: RDD[(LongWritable, Text)] = MzFileUtils.getFileRdds(sc, currentDay, "")
val logMapRdds = MzFileUtils.mapToMzlog(logRdds)

//---RDD B transforming to RDD[(spid, spid)]---
val tongYuanRdd = logMapRdds.filter(kvs => kvs("plt") == "0" && kvs("tp") == "imp").map(kvs => kvs("p").trim).map(spid => (spid, spid)).partitionBy(new HashPartitioner(32));

//---join---
val globalSpids = sc.broadcast(spidRdds.collectAsMap());
val filteredTongYuanRdd = tongYuanRdd.mapPartitions({
iter =>
val m = globalSpids.value
for {
(spid, spid_cp) <- iter
if m.contains(spid)
} yield spid
}, preservesPartitioning = true);
println("Total TongYuan Imp: " + filteredTongYuanRdd.count())

如您所见,以上两个代码片段之间的唯一区别是“join”部分。

那么,对于解决这个问题有什么建议吗?提前致谢!

最佳答案

Spark 的连接不强制键的唯一性,当键重复时,实际上输出该键的叉积。使用 cogroup 并仅在每个键的 k/v 对上输出,或者仅映射到 id,然后使用 intersection 即可。

关于hadoop - 加入 Spark 输出错误的结果,而 map-side join 是正确的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31128867/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com