gpt4 book ai didi

apache-spark - Spark : How to time range join two lists in memory?

转载 作者:行者123 更新时间:2023-12-04 04:05:40 25 4
gpt4 key购买 nike

我是 Spark 的新手,我很难理解这种思维方式。以下问题看起来很普遍,但我不知道如何仅使用 Spark 及其节点的内存来解决它们。

我有两个列表(即:RDD):

  1. List1 - (id, start_time, value) 其中元组 (id, start_time) 是唯一的
  2. List2 - (id, timestamp)

第一个问题:遍历 List2 并为每个(id,时间戳)在 List1 中找到一个具有相同 id 和时间戳之前的最大 start_time 的值。

例如:

List1:
(1, 10:00, a)
(1, 10:05, b)
(1, 10:30, c)
(2, 10:02, d)

List2:
(1, 10:02)
(1, 10:29)
(2, 10:03)
(2: 10:04)

Result:
(1, 10:02) => a
(1, 10:29) => b
(2, 10:03) => d
(2: 10:04) => d

第二个问题:与第一个问题非常相似,但现在开始时间和时间戳是模糊的。这意味着时间 t 可能介于 (t - delta) 和 (t + delta) 之间。同样,我需要时间加入列表。

注意事项:

  1. 有一个solution使用 Cassandra 解决第一个问题,但我有兴趣仅使用 Spark 和节点的内存来解决它。
  2. List1 有数千个条目。
  3. List2 有数千万个条目。

最佳答案

为简洁起见,我已将您的时间数据 10:02 转换为十进制数据 10.02。只需使用将时间字符串转换为数字的函数即可。

第一个问题可以使用 SparkSQL 轻松解决,如下所示。

val list1 = spark.sparkContext.parallelize(Seq(
(1, 10.00, "a"),
(1, 10.05, "b"),
(1, 10.30, "c"),
(2, 10.02, "d"))).toDF("col1", "col2", "col3")

val list2 = spark.sparkContext.parallelize(Seq(
(1, 10.02),
(1, 10.29),
(2, 10.03),
(2, 10.04)
)).toDF("col1", "col2")

list1.createOrReplaceTempView("table1")

list2.createOrReplaceTempView("table2")


scala> spark.sql("""
| SELECT col1,col2,col3
| FROM
| (SELECT
| t2.col1, t2.col2, t1.col3,
| ROW_NUMBER() over(PARTITION BY t2.col1, t2.col2 ORDER BY t1.col2 DESC) as rank
| FROM table2 t2
| LEFT JOIN table1 t1
| ON t1.col1 = t2.col1
| AND t2.col2 > t1.col2) tmp
| WHERE tmp.rank = 1""").show()
+----+-----+----+
|col1| col2|col3|
+----+-----+----+
| 1|10.02| a|
| 1|10.29| b|
| 2|10.03| d|
| 2|10.04| d|
+----+-----+----+

类似地,第二个问题的解决方案可以通过改变连接条件得出,如下所示

spark.sql("""
SELECT col1,col2,col3
FROM
(SELECT
t2.col1, t2.col2, t1.col3,
ROW_NUMBER() over(PARTITION BY t2.col1, t2.col2 ORDER BY t1.col2 DESC) as rank
FROM table2 t2
LEFT JOIN table1 t1
ON t1.col1 = t2.col1
AND t2.col2 between t1.col2 - ${delta} and t1.col2 + ${delta} ) tmp // replace delta with actual value
WHERE tmp.rank = 1""").show()

关于apache-spark - Spark : How to time range join two lists in memory?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44101317/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com