- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有非常大的时间序列数据,数据格式为:(arrival_time, key, value),时间单位是秒,例如:
0.01, k, v
0.03, k, v
....
1.00, k, v
1.10, k, v
1.20, k, v
1.99, k, v
2.00, k, v
...
我需要做的是获取整个数据的每秒行数。到目前为止,我使用 pySpark,我的代码如下:
linePerSec = []
lo = rdd.take(1)[0]
hi = lo + 1.0
end = rdd.collect()[-1][0]
while(hi < end):
number = rdd.filter(lambda (t, k, v): t >= lo and t < hi).count()
linePerSec.append(number)
lo = hi
hi = lo + 1.0
但它非常慢,甚至比在 for 循环中逐行遍历数据还要慢。我想这是因为 rdd.filter() 遍历整个 rdd 以找到满足过滤器条件的行。但是对于时间序列,我们不需要在我的代码中遍历 hi 边界之后的数据。在我的情况下,是否有任何解决方案可以让 spark 停止通过 rdd?谢谢!
最佳答案
首先让我们创建一些虚拟数据:
rdd = sc.parallelize(
[(0.01, "k", "v"),
(0.03, "k", "v"),
(1.00, "k", "v"),
(1.10, "k", "v"),
(1.20, "k", "v"),
(1.99, "k", "v"),
(2.00, "k", "v"),
(3.10, "k", "v"),
(4.50, "k", "v")])
从 RDD 中提取时间字段:
def get_time(x):
(start, _, _) = x
return start
times = rdd.map(get_time)
接下来我们需要一个从时间到键的函数映射:
def get_key_(start):
offset = start - int(start)
def get_key(x):
w = int(x) + offset
return w if x >= w else int(x - 1) + offset
return get_key
找到最小和最大时间
start = times.takeOrdered(1)[0]
end = times.top(1)[0]
生成一个实际的键函数:
get_key = get_key_(start)
并计算平均值
from operator import add
total = (times
.map(lambda x: (get_key(x), 1))
.reduceByKey(add)
.values()
.sum())
time_range = get_key(end) - get_key(start) + 1.0
mean = total / time_range
mean
## 1.8
快速检查:
它给出 9/5 = 1.8
等效数据框如下所示:
from pyspark.sql.functions import count, col, sum, lit, min, max
# Select only arrival times
arrivals = df.select("arrival_time")
# This is almost identical as before
start = df.agg(min("arrival_time")).first()[0]
end = df.agg(max("arrival_time")).first()[0]
get_key = get_key_(start)
time_range = get_key(end) - get_key(start) + 1.0
# But we'll need offset as well
offset = start - int(start)
# and define a bucket column
bucket = (col("arrival_time") - offset).cast("integer") + offset
line_per_sec = (df
.groupBy(bucket)
.agg(count("*").alias("cnt"))
.agg((sum("cnt") / lit(time_range)).alias("mean")))
line_per_sec.show()
## +----+
## |mean|
## +----+
## | 1.8|
## +----+
请注意,这与 the solution 非常相似由 Nhor 提供有两个主要区别:
关于python - 使用 Apache-Spark 分析时间序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33728994/
我刚刚继承了一个旧的 PostgreSQL 安装,需要进行一些诊断以找出该数据库运行缓慢的原因。在 MS SQL 上,您可以使用 Profiler 等工具来查看正在运行的查询,然后查看它们的执行计划。
将目标从Analytics(分析)导入到AdWords中,然后在Analytics(分析)中更改目标条件时,是否可以通过更改将目标“重新导入”到AdWords,还是可以自动选择? 最佳答案 更改目标值
我正在使用google analytics api来获取数据。我正在获取数据,但我想验证两个参数,它们在特定日期范围内始终为0。我正在获取['ga:transactions']和['ga:goalCo
我使用Google API从Google Analytics(分析)获取数据,但指标与Google Analytics(分析)的网络界面不同。 即:我在2015年3月1日获得数据-它返回综合浏览量79
我在我的Web应用程序中使用sammy.js进行剔除。我正在尝试向其中添加Google Analytics(分析)。我很快找到了following plugin来实现页面跟踪。 我按照步骤操作,页面如
当使用 Xcode 分析 (product>analyze) 时,有没有办法忽略给定文件中的任何错误? 例如编译指示之类的? 我们只想忽略第三方代码的任何警告,这样当我们的代码出现问题时,它对我们
目录 EFK 1. 日志系统 2. 部署ElasticSearch 2.1 创建handless服务 2.2 创建s
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 7年前关闭。 Improve thi
GCC/G++ 是否有可用于输出分析的选项? 能够比较以前的代码与新代码之间的差异(大小、类/结构的大小)将很有用。然后可以将它们与之前的输出进行比较以进行比较,这对于许多目的都是有用的。 如果没有此
我正在浏览 LYAH,并一直在研究处理列表时列表理解与映射/过滤器的使用。我已经分析了以下两个函数,并包含了教授的输出。如果我正确地阅读了教授的内容,我会说 FiltB 的运行速度比 FiltA 慢很
在 MySQL 中可以使用 SET profiling = 1; 设置分析 查询 SHOW PROFILES; 显示每个查询所用的时间。我想知道这个时间是只包括服务器的执行时间还是还包括将结果发送到前
我用 Python 编写了几个用于生成阶乘的模块,我想测试运行时间。我找到了一个分析示例 here我使用该模板来分析我的模块: import profile #fact def main():
前几天读了下mysqld_safe脚本,个人感觉还是收获蛮大的,其中细致的交代了MySQL数据库的启动流程,包括查找MySQL相关目录,解析配置文件以及最后如何调用mysqld程序来启动实例等,有着
上一篇:《人工智能大语言模型起源篇,低秩微调(LoRA)》 (14)Rae 和同事(包括78位合著者!)于2022年发表的《Scaling Language Models: Methods, A
1 内网基础 内网/局域网(Local Area Network,LAN),是指在某一区域内有多台计算机互联而成的计算机组,组网范围通常在数千米以内。在局域网中,可以实现文件管理、应用软件共享、打印机
1 内网基础 内网/局域网(Local Area Network,LAN),是指在某一区域内有多台计算机互联而成的计算机组,组网范围通常在数千米以内。在局域网中,可以实现文件管理、应用软件共享、打印机
我有四列形式的数据。前三列代表时间,value1,value 2。第四列是二进制,全为 0 或 1。当第四列中对应的二进制值为0时,有没有办法告诉excel删除时间、值1和值2?我知道这在 C++ 或
我正在运行一个进行长时间计算的 Haskell 程序。经过一些分析和跟踪后,我注意到以下内容: $ /usr/bin/time -v ./hl test.hl 9000045000050000 Com
我有一个缓慢的 asp.net 程序正在运行。我想分析生产服务器以查看发生了什么,但我不想显着降低生产服务器的速度。 一般而言,配置生产盒或仅本地开发盒是标准做法吗?另外,您建议使用哪些程序来实现这一
我目前正在尝试分析 Haskell 服务器。服务器永远运行,所以我只想要一个固定时间的分析报告。我尝试只运行该程序 3 分钟,然后礼貌地要求它终止,但不知何故,haskell 分析器不遵守术语信号,并
我是一名优秀的程序员,十分优秀!