- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在以设定的频率(例如8hz)收集数据,该数据会被修改,存储并偶尔发送出去以进行写入。
由于流式传输/写入数据,我遇到了时序问题。程序写入数据时(每5秒一次)花费的时间超过1 / 8hz(0.125s)。这会延迟我的数据采集时间。
我想做的是调用我的写函数并使它运行,但也允许我的主程序继续运行,以使计时没有延迟。
我尝试使用几种不同的方法,但是运气不好:线程,多处理和异步。我很可能会错误地使用它们。
我正在做的一个非常简化的版本:
def main():
while True:
curTime = datetime.datetime.now()
while curTime < nextTime:
continue
data = collectData() #collect data (serial port, tcp, etc.)
pdata = processData(data) #process data
hdata = holdData(hdata) #store data stream for occasional writing
if len(hdata) > 8*5:
writeData(hdata) #send data to be written - takes too long and causes delay in next sample > 0.125s from previous.
nextTime = curTime + datetime.timedelta(microsecond = 125000) #adjust next time for measurement - 0.125s after last time data was collected.
最佳答案
您正在尝试使用异步编程来解决问题的正确方法。 Python中的异步编程本身就很棘手,因为使用线程(threading
),进程(multiprocessing
)或协程(asyncio
)实现的并发性存在主要差异。没有“正确”的方法,您选择最适合当前用例的方法。
您的问题既有IO绑定(数据获取和写入)任务,也有CPU绑定(数据处理)任务,它们可以并行运行。这是您的操作方法。也许这不是最优雅的解决方案,但是它将向您展示如何解决此类问题的想法。
在我们的解决方案中,我们将线程用于IO绑定任务,而进程则用于CPU绑定任务。就个人而言,我更愿意使用线程来完成所有任务,但是在这种情况下,由于GIL的原因,我们将无法释放现代多核CPU的所有功能来并行化数据处理。
首先,让我们在可执行脚本中导入所需的模块:
import time
import random
import signal
from threading import Thread
from multiprocessing.pool import Pool
from queue import Queue, Empty
WORKERS = 4
FETCH_INTERVAL = 1
FETCH_INTERVAL
秒获取数据的主线程:
def main():
raw_data = Queue()
processor = Thread(target=process, args=(raw_data,))
processor.start()
i = 0
try:
while True:
t_fetch = time.time()
# Simulate the data fetching:
time.sleep(0.5)
data = i, random.random()
print("[main] Fetched raw data:", data)
raw_data.put(data)
t_elapsed = time.time() - t_fetch
if t_elapsed < FETCH_INTERVAL:
time.sleep(FETCH_INTERVAL - t_elapsed)
else:
print("[error] The fetch interval is too short!")
i = i + 1
except KeyboardInterrupt:
print("shutting down...")
finally:
raw_data.put(None)
processor.join()
if __name__ == "__main__":
main()
raw_data
队列,该队列将存储获取的数据,然后开始一个
processor
线程,该线程运行一个
process
函数,该函数将
raw_data
队列作为其参数。请注意,我们不仅在每次获取数据后都在
FETCH_INTERVAL
秒内休眠,而且还要考虑到由于数据获取而导致的延迟,因为这也是一项与IO绑定的任务。该脚本将无限期运行,直到按下
Ctrl-C
为止。一旦中断,我们将
None
放入队列以向线程发送信号,表明处理已结束,并等待
processor
线程完成。现在,我们添加由
process
线程运行的
processor
函数的定义:
def process(raw_data):
proc_data = Queue()
writer = Thread(target=write, args=(proc_data,))
writer.start()
with Pool(WORKERS, init_worker) as pool:
while True:
data_batch = dequeue_data(raw_data, batch_size=WORKERS)
if not data_batch:
time.sleep(0.5)
continue
results = pool.map(process_data, data_batch)
print("[processor] Processed raw data:", results)
for r in results:
proc_data.put(r)
if None in data_batch:
break
print("joining the writer thread...")
writer.join()
proc_data
队列,该队列将保存
writer
线程的数据处理结果。
writer
线程运行一个
write
函数,稍后我们将对其进行定义。一旦启动
writer
线程,我们将创建
pool
个
WORKERS
进程。在这里,我们使用
init_worker
函数作为
Pool
进程初始化程序,以忽略工作进程在主线程中处理时的键盘中断:
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
raw_data
函数不断地使
dequeue_data
队列中的数据批次出队。然后将数据批处理提交给工作池进行处理。
process_data
函数将在下面定义。然后,我们收集结果并将其放入
proc_data
队列,该队列由
writer
线程读取。如果数据批中有
None
,则处理会中断,我们等待
writer
线程完成。
dequeue_data
函数的定义如下:
def dequeue_data(data_queue, batch_size):
items = []
for _ in range(batch_size):
try:
item = data_queue.get(block=False)
except (KeyboardInterrupt, Empty):
break
items.append(item)
return items
batch_size
获取并返回最多
data_queue
数据点。如果没有数据,则返回一个空列表。
process_data
函数什么都不做,只能休眠1-5秒:
def process_data(data):
if data is None:
return
# Simulate the data processing:
time.sleep(random.randint(1, 5))
return data
write
线程中运行的
writer
函数:
def write(proc_data):
while True:
data = proc_data.get()
if data is None:
break
# Simulate the data writing:
time.sleep(random.randint(1, 2))
print("[writer] Wrote processed data:", data)
None
队列获取
proc_data
,无限循环就会停止。现在,我们将所有提供的代码保存在一个脚本中,然后运行并检查其输出:
[main] Fetched raw data: (0, 0.8092310624924178)
[main] Fetched raw data: (1, 0.8594148294409398)
[main] Fetched raw data: (2, 0.9059856675215566)
[main] Fetched raw data: (3, 0.5653361157057876)
[main] Fetched raw data: (4, 0.8966396309003691)
[main] Fetched raw data: (5, 0.5772344067614918)
[processor] Processed raw data: [(0, 0.8092310624924178)]
[main] Fetched raw data: (6, 0.4614411399877961)
^Cshutting down...
[writer] Wrote processed data: (0, 0.8092310624924178)
[processor] Processed raw data: [(1, 0.8594148294409398), (2, 0.9059856675215566), (3, 0.5653361157057876), (4, 0.8966396309003691)]
[writer] Wrote processed data: (1, 0.8594148294409398)
[writer] Wrote processed data: (2, 0.9059856675215566)
[processor] Processed raw data: [(5, 0.5772344067614918), (6, 0.4614411399877961), None]
joining the writer thread...
[writer] Wrote processed data: (3, 0.5653361157057876)
[writer] Wrote processed data: (4, 0.8966396309003691)
[writer] Wrote processed data: (5, 0.5772344067614918)
[writer] Wrote processed data: (6, 0.4614411399877961)
main
线程以固定的时间间隔获取数据,而
processor
并行地批量处理数据,而
writer
保存结果。一旦我们点击
Ctrl-C
,
main
线程就停止获取数据,然后
processor
线程就完成了对其余获取数据的处理,并开始等待
writer
线程完成将数据写入磁盘。
关于python - Python-调用函数和继续的主程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56959038/
我正在处理一组标记为 160 个组的 173k 点。我想通过合并最接近的(到 9 或 10 个组)来减少组/集群的数量。我搜索过 sklearn 或类似的库,但没有成功。 我猜它只是通过 knn 聚类
我有一个扁平数字列表,这些数字逻辑上以 3 为一组,其中每个三元组是 (number, __ignored, flag[0 or 1]),例如: [7,56,1, 8,0,0, 2,0,0, 6,1,
我正在使用 pipenv 来管理我的包。我想编写一个 python 脚本来调用另一个使用不同虚拟环境(VE)的 python 脚本。 如何运行使用 VE1 的 python 脚本 1 并调用另一个 p
假设我有一个文件 script.py 位于 path = "foo/bar/script.py"。我正在寻找一种在 Python 中通过函数 execute_script() 从我的主要 Python
这听起来像是谜语或笑话,但实际上我还没有找到这个问题的答案。 问题到底是什么? 我想运行 2 个脚本。在第一个脚本中,我调用另一个脚本,但我希望它们继续并行,而不是在两个单独的线程中。主要是我不希望第
我有一个带有 python 2.5.5 的软件。我想发送一个命令,该命令将在 python 2.7.5 中启动一个脚本,然后继续执行该脚本。 我试过用 #!python2.7.5 和http://re
我在 python 命令行(使用 python 2.7)中,并尝试运行 Python 脚本。我的操作系统是 Windows 7。我已将我的目录设置为包含我所有脚本的文件夹,使用: os.chdir("
剧透:部分解决(见最后)。 以下是使用 Python 嵌入的代码示例: #include int main(int argc, char** argv) { Py_SetPythonHome
假设我有以下列表,对应于及时的股票价格: prices = [1, 3, 7, 10, 9, 8, 5, 3, 6, 8, 12, 9, 6, 10, 13, 8, 4, 11] 我想确定以下总体上最
所以我试图在选择某个单选按钮时更改此框架的背景。 我的框架位于一个类中,并且单选按钮的功能位于该类之外。 (这样我就可以在所有其他框架上调用它们。) 问题是每当我选择单选按钮时都会出现以下错误: co
我正在尝试将字符串与 python 中的正则表达式进行比较,如下所示, #!/usr/bin/env python3 import re str1 = "Expecting property name
考虑以下原型(prototype) Boost.Python 模块,该模块从单独的 C++ 头文件中引入类“D”。 /* file: a/b.cpp */ BOOST_PYTHON_MODULE(c)
如何编写一个程序来“识别函数调用的行号?” python 检查模块提供了定位行号的选项,但是, def di(): return inspect.currentframe().f_back.f_l
我已经使用 macports 安装了 Python 2.7,并且由于我的 $PATH 变量,这就是我输入 $ python 时得到的变量。然而,virtualenv 默认使用 Python 2.6,除
我只想问如何加快 python 上的 re.search 速度。 我有一个很长的字符串行,长度为 176861(即带有一些符号的字母数字字符),我使用此函数测试了该行以进行研究: def getExe
list1= [u'%app%%General%%Council%', u'%people%', u'%people%%Regional%%Council%%Mandate%', u'%ppp%%Ge
这个问题在这里已经有了答案: Is it Pythonic to use list comprehensions for just side effects? (7 个答案) 关闭 4 个月前。 告
我想用 Python 将两个列表组合成一个列表,方法如下: a = [1,1,1,2,2,2,3,3,3,3] b= ["Sun", "is", "bright", "June","and" ,"Ju
我正在运行带有最新 Boost 发行版 (1.55.0) 的 Mac OS X 10.8.4 (Darwin 12.4.0)。我正在按照说明 here构建包含在我的发行版中的教程 Boost-Pyth
学习 Python,我正在尝试制作一个没有任何第 3 方库的网络抓取工具,这样过程对我来说并没有简化,而且我知道我在做什么。我浏览了一些在线资源,但所有这些都让我对某些事情感到困惑。 html 看起来
我是一名优秀的程序员,十分优秀!