gpt4 book ai didi

apache-spark - 将两个数据框中的行与最近点连接起来

转载 作者:行者123 更新时间:2023-12-05 05:46:05 25 4
gpt4 key购买 nike

您好,我是 spark 的新手,我不确定如何处理这个问题。

我有 2 个表(更小以便于解释):

答:Weather Data

乙:travel data

我需要通过在同一日期开始旅行时找到最近的车站来加入这些表格,并在旅行结束时执行相同的操作。所以最后我得到了旅行开始时和旅行结束时气象站的所有天气数据,每次旅行只有一行来自最近的气象站的数据。

我已经用 geopandas 和 udf 做了类似的事情,但它更容易,因为我正在寻找拦截。像这样:

def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)

state_gps = udf(find_state_gps, StringType())

这次我不确定如何处理逻辑。

我也尝试过执行此查询,但没有成功。

query = "SELECT STATION,\
NAME,\
LATITUDE,\
LONGITUDE,\
AWND,\
p.id_trip,\
p.Latitude,\
p.Longitude,\
p.startDate,\
Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
station_id,\
Latitude,\
Longitude,\
startDate\
FROM df1\
) AS p ON 1=1\
ORDER BY dd"

出现以下错误:解析异常:不匹配的输入 '2' 期望 {, ';'}(第 1 行,位置 189)

最后我想要这样的东西而无需重复旅行。

<表类="s-表"><头>id开始日期完成日期完成weather_station_startweather_station_end关于旅行开始和结束地点的天气的更多专栏<正文>1比姆巴兹比姆巴兹比姆比姆2比姆巴兹比姆巴兹比姆比姆

非常感谢你们的帮助。

最佳答案

我稍微更改了您的示例数据,因为所有站点的坐标都相同:

travel_data  = spark.createDataFrame(
[
('0','2013-06-01','00:00:01','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('1','2013-06-01','00:00:08','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('2','2013-06-01','00:00:44','-73.99595065','40.69512845','40.69512845','-73.99595065','2013-06-01')
,('3','2013-06-01','00:01:04','-73.98758561','40.73524276','40.6917823','-73.9737299','2013-06-01')
,('4','2013-06-01','00:01:22','-74.01677685','40.70569254','40.68926942','-73.98912867','2013-06-01')
], ['id','startDate','startTime','Longitude','Latitude','end station latitude','end station longitude','stopdate']
)

weather_data = spark.createDataFrame(
[
('USINYWC0003','WHITE PLAINS 3.1 NNW 3, NY US','41.0639','-73.7722','71','2013-06-01','','','','','')
,('USINYWC0002','WHITE PLAINS 3.1 NNW 2, NY US','41.0638','-73.7723','71','2013-06-02','','','','','')
,('USINYWC0001','WHITE PLAINS 3.1 NNW 1, NY US','41.0635','-73.7724','71','2013-06-03','','','','','')
], ['STATION','NAME','LATITUDE','LONGITUDE','ELEVATION','DATE','AWND','AWND ATTRIBUTES','DAPR','DAPR ATTRIBUTES','DASE']
)

+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| id| startDate|startTime| Longitude| Latitude|end station latitude|end station longitude| stopdate|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| 0|2013-06-01| 00:00:01|-73.98915076| 40.7423543| 40.74317449| -74.00366443|2013-06-01|
| 1|2013-06-01| 00:00:08|-73.98915076| 40.7423543| 40.74317449| -74.00366443|2013-06-01|
| 2|2013-06-01| 00:00:44|-73.99595065|40.69512845| 40.69512845| -73.99595065|2013-06-01|
| 3|2013-06-01| 00:01:04|-73.98758561|40.73524276| 40.6917823| -73.9737299|2013-06-01|
| 4|2013-06-01| 00:01:22|-74.01677685|40.70569254| 40.68926942| -73.98912867|2013-06-01|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+

+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
| STATION| NAME|LATITUDE|LONGITUDE|ELEVATION| DATE|AWND|AWND ATTRIBUTES|DAPR|DAPR ATTRIBUTES|DASE|
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|USINYWC0003|WHITE PLAINS 3.1 ...| 41.0639| -73.7722| 71|2013-06-01| | | | | |
|USINYWC0002|WHITE PLAINS 3.1 ...| 41.0638| -73.7723| 71|2013-06-02| | | | | |
|USINYWC0001|WHITE PLAINS 3.1 ...| 41.0635| -73.7724| 71|2013-06-03| | | | | |
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+

然后,交叉连接两个数据帧,以计算起点/终点与所有站点之间的正弦距离。使用交叉连接不是最佳解决方案,但根据数据的大小,它可能是最简单的方法


from pyspark.sql.types import *
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos, max, min
from pyspark.sql import Window as W

join_df = travel_data\
.crossJoin(weather_data.select('NAME',col('LATITUDE').alias('st_LAT'),col('LONGITUDE').alias('st_LON'), 'AWND')) \
.withColumn("dlon_start", radians(col("st_LON")) - radians(col("Longitude"))) \
.withColumn("dlat_start", radians(col("st_LAT")) - radians(col("Latitude"))) \
.withColumn("haversine_dist_start", asin(sqrt(
sin(col("dlat_start") / 2) ** 2 + cos(radians(col("Latitude")))
* cos(radians(col("st_LAT"))) * sin(col("dlon_start") / 2) ** 2
)
) * 2 * 3963 * 5280)\
.withColumn("dlon_end", radians(col("st_LON")) - radians(col("end station longitude"))) \
.withColumn("dlat_end", radians(col("st_LAT")) - radians(col("end station latitude"))) \
.withColumn("haversine_dist_end", asin(sqrt(
sin(col("dlat_end") / 2) ** 2 + cos(radians(col("Latitude")))
* cos(radians(col("st_LAT"))) * sin(col("dlon_end") / 2) ** 2
)
) * 2 * 3963 * 5280)\
.drop('dlon_start','dlat_start','dlon_end','dlat_end')

最后,使用窗口函数选择距离起点最近的车站 (result1) 和距离终点最近的车站 (result2)

W = W.partitionBy("id")

result1 = join_df\
.withColumn("min_dist_start", min('haversine_dist_start').over(W))\
.filter(col("min_dist_start") == col('haversine_dist_start'))\
.select('id',col('startDate').alias('started_date'),col('stopdate').alias('finish_date'),col('NAME').alias('weather_station_start'),col('Latitude').alias('Latitude_start'),col('Longitude').alias('Longitude_start'))



result2 = join_df\
.withColumn("min_dist_end", min('haversine_dist_end').over(W))\
.filter(col("min_dist_end") == col('haversine_dist_end'))\
.select('id', col('NAME').alias('weather_station_end'))

final = result1.join(result2, 'id', 'left')

final.show()

不确定您想要在输出中包含哪些列,但希望这能给您一些见解输出:

+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|id |started_date|finish_date|weather_station_start |Latitude_start|Longitude_start|weather_station_end |
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|0 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543 |-73.98915076 |WHITE PLAINS 3.1 NNW 1, NY US|
|1 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543 |-73.98915076 |WHITE PLAINS 3.1 NNW 1, NY US|
|2 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.69512845 |-73.99595065 |WHITE PLAINS 3.1 NNW 1, NY US|
|3 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.73524276 |-73.98758561 |WHITE PLAINS 3.1 NNW 1, NY US|
|4 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.70569254 |-74.01677685 |WHITE PLAINS 3.1 NNW 1, NY US|
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+

关于apache-spark - 将两个数据框中的行与最近点连接起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71230505/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com