gpt4 book ai didi

scala - 为什么聚类似乎在 spark cogroup 函数中不起作用

转载 作者:行者123 更新时间:2023-12-03 19:12:07 27 4
gpt4 key购买 nike

我有两个 hive 聚簇表 t1 和 t2

CREATE EXTERNAL TABLE `t1`(
`t1_req_id` string,
...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS

// t2 looks similar with same amount of buckets

代码如下所示:
 val t1 = spark.table("t1").as[T1].rdd.map(v => (v.t1_req_id, v))
val t2= spark.table("t2").as[T2].rdd.map(v => (v.t2_req_id, v))

val outRdd = t1.cogroup(t2)
.flatMap { coGroupRes =>
val key = coGroupRes._1
val value: (Iterable[T1], Iterable[T2])= coGroupRes._2
val t3List = // create a list with some logic on Iterable[T1] and Iterable[T2]
t3List
}
outRdd.write....

我确保 t1 和 t2 表都有相同数量的分区,并且在 spark-submit 上有 spark.sql.sources.bucketing.enabled=truespark.sessionState.conf.bucketingEnabled=true旗帜

但 Spark DAG 并没有显示出集群的任何影响。似乎还有数据全洗牌
我错过了什么,任何其他配置,调整?如何保证没有完整的数据洗牌?
我的 Spark 版本是 2.3.1

enter image description here

最佳答案

它不应该显示。

任何逻辑优化都限于 DataFrame API。一旦您将数据推送到黑盒功能数据集 API(参见 Spark 2.0 Dataset vs DataFrame),然后再推送到 RDD API,就不会再将更多信息推送回优化器。

您可以通过先执行连接来部分利用分桶,围绕这些线获得一些东西

spark.table("t1")
.join(spark.table("t2"), $"t1.t1_req_id" === $"t2.t2_req_id", "outer")
.groupBy($"t1.v.t1_req_id", $"t2.t2_req_id")
.agg(...) // For example collect_set($"t1.v"), collect_set($"t2.v")

但是,与 cogroup 不同,这将在组内生成完整的笛卡尔积,并且可能不适用于您的情况

关于scala - 为什么聚类似乎在 spark cogroup 函数中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61727474/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com