gpt4 book ai didi

apache-spark - 如何加入两个 JDBC 表并避免交换?

转载 作者:行者123 更新时间:2023-12-04 04:19:42 25 4
gpt4 key购买 nike

我有类似 ETL 的场景,其中我从多个 JDBC 表和文件中读取数据,并在源之间执行一些聚合和连接。

在一个步骤中,我必须加入两个 JDBC 表。我试过做这样的事情:

val df1 = spark.read.format("jdbc")
.option("url", Database.DB_URL)
.option("user", Database.DB_USER)
.option("password", Database.DB_PASSWORD)
.option("dbtable", tableName)
.option("driver", Database.DB_DRIVER)
.option("upperBound", data.upperBound)
.option("lowerBound", data.lowerBound)
.option("numPartitions", data.numPartitions)
.option("partitionColumn", data.partitionColumn)
.load();

val df2 = spark.read.format("jdbc")
.option("url", Database.DB_URL)
.option("user", Database.DB_USER)
.option("password", Database.DB_PASSWORD)
.option("dbtable", tableName)
.option("driver", Database.DB_DRIVER)
.option("upperBound", data2.upperBound)
.option("lowerBound", data2.lowerBound)
.option("numPartitions", data2.numPartitions)
.option("partitionColumn", data2.partitionColumn)
.load();

df1.join(df2, Seq("partition_key", "id")).show();

请注意 partitionColumn在这两种情况下都是相同的 - “partition_key”。

但是,当我运行这样的查询时,我可以看到不必要的交换(为了可读性而清除了计划):

df1.join(df2, Seq("partition_key", "id")).explain(extended = true);

Project [many many fields]
+- Project [partition_key#10090L, iv_id#10091L, last_update_timestamp#10114, ... more fields]
+- SortMergeJoin [partition_key#10090L, id#10091L], [partition_key#10172L, id#10179L], Inner
:- *Sort [partition_key#10090L ASC NULLS FIRST, iv_id#10091L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(partition_key#10090L, iv_id#10091L, 4)
: +- *Scan JDBCRelation((select mod(s.id, 23) as partition_key, s.* from tab2 s)) [numPartitions=23] [partition_key#10090L,id#10091L,last_update_timestamp#10114] PushedFilters: [*IsNotNull(PARTITION_KEY)], ReadSchema: struct<partition_key:bigint,id:bigint,last_update_timestamp:timestamp>
+- *Sort [partition_key#10172L ASC NULLS FIRST, id#10179L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(partition_key#10172L, iv_id#10179L, 4)
+- *Project [partition_key#10172L, id#10179L ... 75 more fields]
+- *Scan JDBCRelation((select mod(s.id, 23) as partition_key, s.* from tab1 s)) [numPartitions=23] [fields] PushedFilters: [*IsNotNull(ID), *IsNotNull(PARTITION_KEY)], ReadSchema: struct<partition_key:bigint,id:bigint...

如果我们已经用 numPartitions 对读取进行了分区和其他选项一样,分区数是一样的,为什么还需要另一个Exchange?我们能以某种方式避免这种不必要的洗牌吗?在测试数据上,我看到 Sparks 在此 Exchange 期间发送了超过 150M 的数据,其中生产 Datasets大得多,所以它可能是严重的瓶颈。

最佳答案

使用 Date Source API 的当前实现,没有向上游传递分区信息,因此即使可以在没有 shuffle 的情况下加入数据,Spark 也无法使用此信息。因此,您的假设是:

JdbcRelation uses RangePartitioning on reading



只是不正​​确。此外,看起来 Spark 使用相同的内部代码来处理基于范围的 JDBC 分区和基于谓词的 JDBC 分区。而前者可以翻译成 SortOrder ,后者通常可能与 Spark SQL 不兼容。

如有疑问,可以检索 Partitioner信息使用 QueryExecution和内部 RDD :
df.queryExecution.toRdd.partitioner

这在 future 可能会改变( SPIP:​ ​ Data​ ​ Source​ ​ API​ ​ V2SPARK-15689 - Data source API v2Spark Data Frame. PreSorded partitions )。

关于apache-spark - 如何加入两个 JDBC 表并避免交换?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47597970/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com