gpt4 book ai didi

apache-spark - 是否 UNION ALL 并行执行不同表上的两个 SELECT?

转载 作者:行者123 更新时间:2023-12-04 14:25:00 36 4
gpt4 key购买 nike

如果我有 SELECT [...] UNION ALL SELECT [...] 形式的 Spark SQL 语句,这两个 SELECT 语句是否会并行执行?在我的特定用例中,两个 SELECT 正在查询两个不同的数据库表。与我的预期相反,Spark UI 似乎表明这两个 SELECT 语句是按顺序执行的。

== 更新 1 ==

下面是 Spark UI 中显示的实际计划:

== Physical Plan ==
*Sort [avg_tip_pct#655 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg_tip_pct#655 DESC NULLS LAST, 4)
+- *HashAggregate(keys=[neighborhood#163], functions=[avg(tip_pct#654)], output=[neighborhood#163, avg_tip_pct#655])
+- Exchange hashpartitioning(neighborhood#163, 4)
+- *HashAggregate(keys=[neighborhood#163], functions=[partial_avg(tip_pct#654)], output=[neighborhood#163, sum#693, count#694L])
+- *Project [neighborhood#163, (tip_amount#513 / total_amount#514) AS tip_pct#654]
+- InMemoryTableScan [neighborhood#163, tip_amount#513, total_amount#514]
+- InMemoryRelation [pickup_latitude#511, pickup_longitude#512, tip_amount#513, total_amount#514, neighborhood#163, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, neighborhood#163, index#165]
+- *Project [pickup_latitude#301, index#165, pickup_longitude#300, neighborhood#163, total_amount#313, point#524, polygon#164, tip_amount#310]
+- *SortMergeJoin [curve#578], [curve#580], Inner, ((relation#581 = Within) || Within(point#524, polygon#164))
:- *Sort [curve#578 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(curve#578, 4)
: +- Generate inline(indexer(point#524, 30)), true, false, [curve#578, relation#579]
: +- Union
: :- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, pointconverter(pickup_longitude#300, pickup_latitude#301) AS point#524]
: : +- *Filter ((isnotnull(total_amount#313) && payment_type#306 IN (CREDIT,CRD,1)) && (total_amount#313 > 200.0))
: : +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2014},org.apache.spark.sql.SQLContext@3bf2de09) [pickup_latitude#301,payment_type#306,pickup_longitude#300,total_amount#313,tip_amount#310] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
: +- *Project [pickup_latitude#436, pickup_longitude#435, tip_amount#445, total_amount#448, pointconverter(pickup_longitude#435, pickup_latitude#436) AS point#524]
: +- *Filter ((isnotnull(total_amount#448) && payment_type#441 IN (CREDIT,CRD,1)) && (total_amount#448 > 200.0))
: +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2015},org.apache.spark.sql.SQLContext@3bf2de09) [payment_type#441,pickup_longitude#435,pickup_latitude#436,total_amount#448,tip_amount#445] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
+- *Sort [curve#580 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(curve#580, 4)
+- Generate inline(index#165), true, false, [curve#580, relation#581]
+- InMemoryTableScan [neighborhood#163, polygon#164, index#165]
+- InMemoryRelation [neighborhood#163, polygon#164, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [UDF:metadata_string(metadata#13, neighborhood) AS neighborhood#163, polygon#12, index#15]
+- InMemoryTableScan [metadata#13, polygon#12, index#15]
+- InMemoryRelation [point#10, polyline#11, polygon#12, metadata#13, valid#14, index#15], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `neighborhoods`
+- *Scan GeoJSONRelation(gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson,Map(type -> geojson, magellan.index -> true, magellan.index.precision -> 30, path -> gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson)) [point#10,polyline#11,polygon#12,metadata#13,valid#14,index#15] ReadSchema: struct<point:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,x:double,y:double>,p...

请注意两个 SELECT 的联合,以扫描 BigQueryTableRelation 的形式出现。这些似乎是按顺序执行的。

每个 BigQuery 选择都在一个单独的作业中执行(每个作业都有一个阶段)- 按顺序执行。我运行一个 5 节点 YARN 集群,每个集群有 4 个 CPU 和 26GB 内存。我想知道我有一个自定义 BigQuery 数据源这一事实在这里是否重要。我希望它不应该。无论如何,作为引用,数据源可以在这里找到:github.com/miraisolutions/spark-bigquery

== 更新 2 ==

在 Spark 日志中,我看到以下日志条目:


19 年 17 月 12 日 14:36:24 信息 SparkSqlParser:解析命令:选择“pickup_latitude”作为“pickup_latitude”,“pickup_longitude”作为“pickup_longitude”,“tip_amount”作为“tip_amount”,“total_amount”作为“total_amount”
从 ((选择 *
来自 `trips2014`)
联合所有
(选择 *
来自 `trips2015`)) `ggcyamhubf`
WHERE (`payment_type` IN ("CREDIT", "CRD", "1"))

Spark 优化此查询并将谓词向下推送到数据源(在本例中为 BigQuery)。然而,相应的 BigQuery 作业似乎完全按顺序执行,即第二个作业仅在第一个作业完成后触发。

最佳答案

TL;DR 是(视 CPU 可用性而定)

附带说明:如果您有疑问,您也可以在它们自己的线程上执行两个 SELECT,然后执行 union(这同样取决于 CPU 的数量),但是您肯定会有真正的并行执行。

让我们使用(非常基本的)以下查询:

val q = spark.range(1).union(spark.range(2))

explain 不会从 CPU 的角度告诉您最终的执行情况,但至少会告诉您是否正在使用整个阶段的代码生成以及查询树的上层距离。

scala> q.explain
== Physical Plan ==
Union
:- *Range (0, 1, step=1, splits=8)
+- *Range (0, 2, step=1, splits=8)

在此示例中,两个 Range 物理运算符(负责两个单独的数据集)将获得“codegend”,因此它们的执行是流水线式的。它们的执行时间是完成处理分区中所有行的时间(在不处理可能使用 System.sleep 或类似代码的 Java 代码本身的“机制”的情况下尽可能快)。

查询的 RDD 谱系可以为您提供有关查询执行的更多信息。

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at <console>:26 []
| MapPartitionsRDD[16] at rdd at <console>:26 []
| UnionRDD[15] at rdd at <console>:26 []
| MapPartitionsRDD[11] at rdd at <console>:26 []
| MapPartitionsRDD[10] at rdd at <console>:26 []
| ParallelCollectionRDD[9] at rdd at <console>:26 []
| MapPartitionsRDD[14] at rdd at <console>:26 []
| MapPartitionsRDD[13] at rdd at <console>:26 []
| ParallelCollectionRDD[12] at rdd at <console>:26 []

除非我弄错了,因为中间没有任何阶段,所以你可以并行化的东西不多——它只是一个有 16 个分区的阶段,它的完成速度与最后一个任务一样快(来自要安排的 16 个任务).

这意味着在这种情况下顺序确实很重要。


我还找到了this JIRA issue关于 UNION ALL 如果不完全像你的情况,它看起来很相似。

关于apache-spark - 是否 UNION ALL 并行执行不同表上的两个 SELECT?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47837955/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com