gpt4 book ai didi

sql - 在spark中找到两个表之间最接近的时间

转载 作者:行者123 更新时间:2023-12-03 15:30:14 25 4
gpt4 key购买 nike

我正在使用 pyspark,我有两个这样的数据框:

user         time          bus
A 2016/07/18 12:00:00 1
B 2016/07/19 12:00:00 2
C 2016/07/20 12:00:00 3

bus time stop
1 2016/07/18 11:59:40 sA
1 2016/07/18 11:59:50 sB
1 2016/07/18 12:00:05 sC
2 2016/07/19 11:59:40 sB
2 2016/07/19 12:00:10 sC
3 2016/07/20 11:59:55 sD
3 2016/07/20 12:00:10 sE

现在我想知道用户在第二个表中根据公交车号和最近的时间报告的站点。

例如,表1中,用户A在2016/07/18 12:00:00上报,他在1号车上,根据第二张表,1号车有3条记录,但是最近的时间是2016/07/18 12:00:05(第三条记录),所以用户现在在sC。

所需的输出应该是这样的:
user         time          bus  stop
A 2016/07/18 12:00:00 1 sC
B 2016/07/19 12:00:00 2 sC
C 2016/07/20 12:00:00 3 sD

我已将时间转换为时间戳,因此唯一的问题是找到总线编号为 eqaul 的最近时间戳。

因为我现在对sql不熟悉,我尝试使用map函数找到最近的时间和它的停靠点,这意味着我必须在map函数中使用sqlContext.sql,而spark似乎不允许这样做:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.



那么如何编写 sql 查询来获得正确的输出呢?

最佳答案

这可以使用窗口函数来完成。

from pyspark.sql.window import Window
from pyspark.sql import Row, functions as W

def tm(str):
return datetime.strptime(str, "%Y/%m/%d %H:%M:%S")

#setup data
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ]
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2))
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3))

busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ]
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB"))
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC"))
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB"))
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC"))
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD"))
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE"))

#create RDD
userDf = sc.parallelize(userTime).toDF().alias("usertime")
busDf = sc.parallelize(busTime).toDF().alias("bustime")

joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
userDf.user,
userDf.time.alias("user_time"),
busDf.bus,
busDf.time.alias("bus_time"),
busDf.stop)

additional_cols = joinedDF.withColumn("bus_time_diff", abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time"))))

partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff") ).alias("rank") ).filter(col("rank") == 1)


additional_cols.show(20,False)
partDf.show(20,False)

输出:
+----+---------------------+---+---------------------+----+-------------+
|user|user_time |bus|bus_time |stop|bus_time_diff|
+----+---------------------+---+---------------------+----+-------------+
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:40.0|sA |20 |
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:50.0|sB |10 |
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5 |
|B |2016-07-19 12:00:00.0|2 |2016-07-19 11:59:40.0|sB |20 |
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10 |
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5 |
|C |2016-07-20 12:00:00.0|3 |2016-07-20 12:00:10.0|sE |10 |
+----+---------------------+---+---------------------+----+-------------+
+----+---------------------+---+---------------------+----+-------------+----+
|user|user_time |bus|bus_time |stop|bus_time_diff|rank|
+----+---------------------+---+---------------------+----+-------------+----+
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5 |1 |
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10 |1 |
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5 |1 |
+----+---------------------+---+---------------------+----+-------------+----+

关于sql - 在spark中找到两个表之间最接近的时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38623782/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com