gpt4 book ai didi

apache-spark - spark如何从jdbc读取数据并分发

转载 作者:行者123 更新时间:2023-12-04 17:29:05 25 4
gpt4 key购买 nike

我需要弄清楚 Spark 在获取外部数据库数据时的工作原理。我从 spark 文档中了解到,如果我没有提及“numPartitons”、“lowerBound”和“upperBound”等属性,那么通过 jdbc 读取不是并行的。在那种情况下会发生什么?数据是否由 1 个获取所有数据的特定执行程序读取?那么并行是如何实现的呢?那个执行者以后会把数据分享给其他执行者吗?但我相信执行者不能像这样分享数据。

如果你们中有人探索过这个,请告诉我。

编辑我的问题 -嗨,阿米特,感谢您的回复,但这不是我要找的。让我详细说明:-请参阅此 - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

引用下面的代码片段——

val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)

输出:

Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|212267|
| 3| 56714|
| 4|124824|
| 2|232193|
| 0|627712|
+--------------------+------+

我在这里使用自定义函数 db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000) 读取表,它指定根据字段 bu_id 创建 5 个分区,其下限值为 10,上限值为是9000。查看 spark 如何使用 5 个并行连接读取 5 个分区中的数据(如 spark doc 所述)。现在让我们在不提及上述任何参数的情况下阅读此表 -

我只是使用另一个函数获取数据 - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)

这里我只传递了 spark session(ss),获取数据的查询(MultiJoin)和另一个异常处理参数(bs)。o/p 如下所示 –获取开始

== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 0|1253710|

查看数据如何读入一个分区,意味着只产生 1 个连接。问题仍然存在,这个分区将只在一台机器上,并且将为此分配 1 个任务。所以这里没有并行性。那么数据如何分发给其他执行者呢?

顺便说一句,这是我在两种情况下都使用的 spark-submit 命令 –

spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar

最佳答案

回复:“从外部数据库获取数据”在您的 spark 应用程序中,这通常是将在执行程序上执行的代码部分。可以通过传递 spark 配置“num-executors”来控制执行者的数量。如果您使用过 Spark 和 RDD/Dataframe,那么您将连接到数据库的示例之一是转换函数,例如 map、flatmap、filter 等。这些函数在执行程序上执行时(由 num-executors 配置) 将建立数据库连接并使用它。

这里要注意的一件重要事情是,如果你使用太多的执行器,那么你的数据库服务器可能会变得越来越慢,最终无法响应。如果你给的执行者太少,那么它可能会导致你的 Spark 工作需要更多的时间才能完成。因此,您必须根据您的数据库服务器容量找到最佳数量。

回复:“那么并行是如何实现的?那个执行者以后会把数据分享给其他执行者吗?”

上面提到的并行是通过配置executor的数量来实现的。配置执行器的数量只是增加并行度的一种方式,并不是唯一的方式。考虑这样一种情况,您的数据量较小,分区较少,那么您将看到较少的并行性。因此,您需要有足够数量的分区(那些对应于任务),然后适当(确定数量取决于用例)数量的执行程序来并行执行这些任务。只要您可以单独处理每条记录,它就会扩展,但是一旦您有一个会导致洗牌的 Action ,您就会看到有关任务和执行者的统计信息。 Spark 将尝试最好地分发数据,以便它可以在最佳水平上工作。

请引用https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-1/以及后续部分以了解更多有关内部结构的信息。

关于apache-spark - spark如何从jdbc读取数据并分发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61299059/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com