gpt4 book ai didi

python - 使用 Apache-Spark 分析时间序列

转载 作者:太空宇宙 更新时间:2023-11-03 11:00:30 25 4
gpt4 key购买 nike

我有非常大的时间序列数据,数据格式为:(arrival_time, key, value),时间单位是秒,例如:

0.01, k, v
0.03, k, v
....
1.00, k, v
1.10, k, v
1.20, k, v
1.99, k, v
2.00, k, v
...

我需要做的是获取整个数据的每秒行数。到目前为止,我使用 pySpark,我的代码如下:

linePerSec = []
lo = rdd.take(1)[0]
hi = lo + 1.0
end = rdd.collect()[-1][0]
while(hi < end):
number = rdd.filter(lambda (t, k, v): t >= lo and t < hi).count()
linePerSec.append(number)
lo = hi
hi = lo + 1.0

但它非常慢,甚至比在 for 循环中逐行遍历数据还要慢。我想这是因为 rdd.filter() 遍历整个 rdd 以找到满足过滤器条件的行。但是对于时间序列,我们不需要在我的代码中遍历 hi 边界之后的数据。在我的情况下,是否有任何解决方案可以让 spark 停止通过 rdd?谢谢!

最佳答案

首先让我们创建一些虚拟数据:

rdd = sc.parallelize(
[(0.01, "k", "v"),
(0.03, "k", "v"),
(1.00, "k", "v"),
(1.10, "k", "v"),
(1.20, "k", "v"),
(1.99, "k", "v"),
(2.00, "k", "v"),
(3.10, "k", "v"),
(4.50, "k", "v")])

从 RDD 中提取时间字段:

def get_time(x):
(start, _, _) = x
return start

times = rdd.map(get_time)

接下来我们需要一个从时间到键的函数映射:

def get_key_(start):
offset = start - int(start)
def get_key(x):
w = int(x) + offset
return w if x >= w else int(x - 1) + offset
return get_key

找到最小和最大时间

start = times.takeOrdered(1)[0]
end = times.top(1)[0]

生成一个实际的键函数:

get_key = get_key_(start)

并计算平均值

from operator import add

total = (times
.map(lambda x: (get_key(x), 1))
.reduceByKey(add)
.values()
.sum())

time_range = get_key(end) - get_key(start) + 1.0

mean = total / time_range

mean
## 1.8

快速检查:

  • [0.01, 1.01]: 3
  • [1.01, 2.01]: 4
  • [2.01, 3.01]: 0
  • [3.01, 4.01]: 1
  • [4.01, 5.01]: 1

它给出 9/5 = 1.8

等效数据框如下所示:

from pyspark.sql.functions import count, col, sum, lit, min, max

# Select only arrival times
arrivals = df.select("arrival_time")

# This is almost identical as before
start = df.agg(min("arrival_time")).first()[0]
end = df.agg(max("arrival_time")).first()[0]

get_key = get_key_(start)
time_range = get_key(end) - get_key(start) + 1.0

# But we'll need offset as well
offset = start - int(start)

# and define a bucket column
bucket = (col("arrival_time") - offset).cast("integer") + offset

line_per_sec = (df
.groupBy(bucket)
.agg(count("*").alias("cnt"))
.agg((sum("cnt") / lit(time_range)).alias("mean")))

line_per_sec.show()

## +----+
## |mean|
## +----+
## | 1.8|
## +----+

请注意,这与 the solution 非常相似由 Nhor 提供有两个主要区别:

  • 使用与您的代码相同的启动逻辑
  • 正确处理空区间

关于python - 使用 Apache-Spark 分析时间序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33728994/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com