gpt4 book ai didi

python - 在最近的关键条件下加入 Spark DataFrames

转载 作者:太空狗 更新时间:2023-10-30 01:24:29 25 4
gpt4 key购买 nike

在 PySpark 中执行模糊连接的高效方法是什么?

我正在寻找社区对在最近的关键条件下连接大型 Spark DataFrame 的可扩展方法的看法。请允许我用一个有代表性的例子来说明这个问题。假设我们有以下 Spark DataFrame,其中包含在某个时间点发生的事件:

ddf_event = spark.createDataFrame(
data=[
[1, 'A'],
[5, 'A'],
[10, 'B'],
[15, 'A'],
[20, 'B'],
[25, 'B'],
[30, 'A']
],
schema=['ts_event', 'event']
)

以及以下包含在某个时间点测量的 GPS 数据的 Spark DataFrame:

ddf_gps = spark.createDataFrame(
data=[
[2, '(-46.84635, 173.13674)'],
[4, '(2.50362, 104.34136)'],
[8, '(-24.20741, 51.80755)'],
[15, '(-59.07798, -20.49141)'],
[18, '(-44.34468, -167.90401)'],
[24, '(-18.84175, 16.68628)'],
[27, '(20.48501,58.42423)']
],
schema=['ts_gps', 'gps_coordinates']
)

我们想加入以生成以下结果 DataFrame:

+--------+-----+------+-----------------------+
|ts_event|event|ts_gps|gps_coordinates |
+--------+-----+------+-----------------------+
|1 |A |2 |(-46.84635, 173.13674) |
|5 |A |4 |(2.50362, 104.34136) |
|10 |B |8 |(-24.20741, 51.80755) |
|15 |A |15 |(-59.07798, -20.49141) |
|20 |B |18 |(-44.34468, -167.90401)|
|25 |B |24 |(-18.84175, 16.68628) |
|30 |A |27 |(20.48501,58.42423) |
+--------+-----+------+-----------------------+

根据事件时间戳和 GPS 数据时间戳有效地找到最近的 GPS 数据点。

因此,我们遇到了在最近的关键条件上加入的问题,在这种情况下,“最近”被定义为时间戳之间的最小绝对差异。

我探索了两种实现此目的的方法:一种基于过滤合并联接 (FBJ),另一种基于过滤排序联合 (FSU)。下面将更详细地描述这两种方法。

FBJ 方法取决于参数 bin_size,它限制了可以找到匹配 GPS 时间戳的时间窗口。增加 bin_size 会增加计算量,减少它会降低结果质量。

这两种方法似乎都不会随着输入 DataFrame 的大小线性扩展。

在实践中,我必须处理由数千万行组成的输入数据,因此我目前找不到可行的问题解决方案。

FBJ 方法

FBJ 方法包括以下步骤:

  1. 创建一个 ts_bin 列,将 timestamp 列装箱,实现方式:
bin_size = 10
ddf_event = ddf_event.withColumn(
'ts_bin',
F.round(F.col('ts_event') / bin_size)
)

ddf_gps = ddf_gps.withColumn(
'ts_bin',
F.round(F.col('ts_gps') / bin_size)
)
  1. ts_bin 列加入 DataFrames,实现者:
ddf = ddf_event.join(ddf_gps, 'ts_bin', 'left_outer')
  1. 确定最小时间戳差异,实现方式:
from pyspark.sql.window import Window

window = Window.partitionBy('ts_event')

ddf = ddf.withColumn(
'ts_diff',
F.abs(F.col('ts_gps') - F.col('ts_event'))
)

ddf = ddf.withColumn(
'min_ts_diff',
F.min(F.col('ts_diff')).over(window)
)
  1. 过滤并选择相关的行和列,实现者:
ddf = (
ddf
.where(
(F.col('ts_diff') == F.col('min_ts_diff')) |
(F.col('ts_diff').isNull())
)
.select(
'ts_event',
'event',
'ts_gps',
'gps_coordinates'
)
)

限制bin_size的情况:

  • bin_size >> 1 有效地导致完全交叉连接
  • bin_size = 1 有效地导致 ts_event == ts_gps
  • 左连接

FSU 方法

FSU 方法包括以下步骤:

  1. 联合 DataFrames,由以下人员实现:
def union(df1, df2):
cols = list(set(df1.columns).union(set(df2.columns)))
for col in cols:
if col not in df1.columns:
df1 = df1.withColumn(col, F.lit(None))
if col not in df2.columns:
df2 = df2.withColumn(col, F.lit(None))
return df1.select(cols).union(df2.select(cols))

ddf_event = ddf_event.withColumn('timestamp', F.col('ts_event'))
ddf_gps = ddf_gps.withColumn('timestamp', F.col('ts_gps'))
ddf = union(ddf_event, ddf_gps)
  1. 对生成的 DataFrame 进行排序并获取相邻的 GPS 时间戳,实现方式为:
from sys import maxsize

last_window = Window.orderBy(
F.col('timestamp').asc()).rowsBetween(-maxsize, 0)
first_window = Window.orderBy(
F.col('timestamp').asc()).rowsBetween(0, maxsize)

ddf = (
ddf.withColumn(
'prev_time',
F.last(F.col('ts_gps'), ignorenulls=True)
.over(last_window)
).withColumn(
'prev_coordinates',
F.last(F.col('gps_coordinates'), ignorenulls=True)
.over(last_window)
).withColumn(
'next_time',
F.first(F.col('ts_gps'), ignorenulls=True)
.over(first_window)
).withColumn(
'next_coordinates',
F.first(F.col('gps_coordinates'), ignorenulls=True)
.over(first_window)
)
)
  1. 过滤并选择相关的行和列,实现者:
condition = (F.col('timestamp') - F.col('prev_time')
< F.col('next_time') - F.col('timestamp'))

ddf = (
ddf
.where(F.col('event').isNotNull())
.withColumn(
'ts_gps',
F.when(condition | F.col('next_time').isNull(), F.col('prev_time')).otherwise(F.col('next_time'))
).withColumn(
'gps_coordinates',
F.when(condition | F.col('next_time').isNull(),
F.col('prev_coordinates'))
.otherwise(F.col('next_coordinates'))
).select(
'ts_event',
'event',
'ts_gps',
'gps_coordinates'
)
)

最佳答案

您正在寻找的是临时连接。查看时间序列Spark库Flint(原HuoHua,中文Spark): https://github.com/twosigma/flint

使用这个库,对于 2 个给定的时间序列数据帧(文档解释了这些对象),您可以在 PySpark(或 Scala Spark)中执行:

ddf_event = ...
ddf_gps = ...
result = ddf_event.leftJoin(ddf_gps, tolerance = "1day")

您的时间戳记不清楚,因此请根据您的需要设置公差。如果需要,您还可以进行“ future 加入”。

查看他们的 Spark Summit 演示文稿以获得更多解释和示例: https://youtu.be/g8o5-2lLcvQ

关于python - 在最近的关键条件下加入 Spark DataFrames,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58077181/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com