gpt4 book ai didi

algorithm - 有没有办法对分区的 spark 数据集并行运行操作?

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:12:45 24 4
gpt4 key购买 nike

我有一个数据集列表,我想按所有数据集共有的特定键进行分区,然后运行一些对所有分区数据集都相同的连接/分组。

我正在尝试以使用 Spark 的 partitionBy 通过特定键创建分区的方式设计算法。

现在,一种方法是在循环中对每个分区运行操作,但效率不高。

我想看看我是否有手动分区数据,我可以并行地对这些数据集运行操作。

我刚刚开始学习 Spark,所以请原谅我的问题。

考虑客户 ID 的数据集及其在不同数据集中的行为数据,如浏览/点击等。说一个用于浏览,另一个用于点击。首先,我正在考虑按客户 ID 对我的数据进行分区,然后为每个分区(客户)加入某些属性,例如浏览器或设备,以查看每个客户的行为方式。所以基本上,它就像一个嵌套的并行化。

这在 Spark 中甚至可能吗?有什么明显的我想念的吗?我可以引用一些文档?

最佳答案

试试这个-

1. Create test dataset (Totol Record = 70000+) to perform parallel operation on each 

scala> ds.count
res137: Long = 70008

scala> ds.columns
res124: Array[String] = Array(awards, country)

2. Assume partition column as "country".

scala> ds.select("country").distinct.show(false)
+-------+
|country|
+-------+
|CANADA |
|CHINA |
|USA |
|EUROPE |
|UK |
|RUSSIA |
|INDIA |
+-------+

3. Get sum of records for each country [ **Without parallel process for each partition**]

scala> val countries = ds.select("country").distinct.collect
countries: Array[org.apache.spark.sql.Row] = Array([CANADA], [CHINA], [USA], [EUROPE], [UK], [RUSSIA], [INDIA])

scala> val startTime = System.currentTimeMillis()
startTime: Long = 1562047887130

scala> countries.foreach(country => ds.filter(ds("country") === country(0)).groupBy("country").count.show(false))
+-------+-----+
|country|count|
+-------+-----+
|CANADA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|CHINA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|USA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|EUROPE |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|UK |10002|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|RUSSIA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|INDIA |10001|
+-------+-----+


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1562047896088

scala> println(s"Total Execution Time : ${(endTime - startTime) / 1000} Seconds")
Total Execution Time : **8 Seconds**

4. Get sum of records for each country [ **With parallel process for each partition**]

scala> val startTime = System.currentTimeMillis()
startTime: Long = 1562048057431

scala> countries.par.foreach(country => ds.filter(ds("country") === country(0)).groupBy("country").count.show(false))

+-------+-----+
|country|count|
+-------+-----+
|INDIA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|CANADA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|RUSSIA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|USA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|UK |10002|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|CHINA |10001|
+-------+-----+

+-------+-----+
|country|count|
+-------+-----+
|EUROPE |10001|
+-------+-----+


scala> val endTime = System.currentTimeMillis()
endTime: Long = 1562048060273

scala> println(s"Total Execution Time : ${(endTime - startTime) / 1000} Seconds")
Total Execution Time : **2 Seconds**

结果:-

With    parallel process on each partition, it took ~ **2 Seconds**
Without parallel process on each partition, it took ~ **8 Seconds**

我测试过检查每个国家的记录数量,你可以执行任何过程,例如写入hive表或hdfs文件等

希望对您有所帮助

关于algorithm - 有没有办法对分区的 spark 数据集并行运行操作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56843096/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com