gpt4 book ai didi

python - 使用多处理/线程通过 Tkinter 读取串行端口和实时图形数据

转载 作者:行者123 更新时间:2023-12-01 02:52:36 25 4
gpt4 key购买 nike

我需要以高速率实时显示图形数据。我需要不断查询串行端口以在可用时检索数据。然后,数据会修改我正在绘制的实例变量。我在两个子图上绘制 4 条不同的线,因此总共 8 条线。我收到了 12 个不同的变量,但只绘制了 8 个变量。我需要该进程在运行该函数一次后不会消失,这样我就可以继续接收新数据。我希望绘图在另一个进程中完成,这样就不会阻止读取串行端口。我找到了示例,并获得了有关如何完成此操作的帮助,如果我已经拥有所有可用的数据,但如果它不断接收新数据,并且我是多处理/线程的新手,因此很难理解如何修改它以连续读取从串行端口。我可以以 10Hz 的速率完成我需要的任务,但我需要以 20Hz 的速率采样,似乎唯一的方法是通过多处理/线程,除非我错过了一些可以优化图形或读取时间的东西。这是一个 Tkinter 窗口,按下 go 按钮时将查询串行端口并绘制图表:

import Tkinter
import serial
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
from matplotlib import pyplot as plt
import matplotlib.animation as animation
from collections import deque
import random
import time

class App:
def __init__(self, master):
self.t = 0
self.arduinoData = serial.Serial('com5', 250000, timeout=None)

frame = Tkinter.Frame(master)

self.running = False
self.ani = None

self.run = Tkinter.LabelFrame(frame, text="Testing", borderwidth=10, relief=Tkinter.GROOVE, padx=10, pady=10)
self.run.grid(row=0, column=0, padx=20, pady=20)

self.run_respiration = Tkinter.Button(self.run, text="RUN",bd=10, height=5, width=10, command=self.getData)
self.run_respiration.grid(row=0, column=0, padx=5, pady=5)

self.test_options = Tkinter.LabelFrame(frame, text="Test Options", borderwidth=10, relief=Tkinter.GROOVE, padx=10, pady=10 )
self.test_options.grid(row=0, column=1, padx=20, pady=20)

self.stop = Tkinter.Button(self.test_options, text="STOP", bd=10, height=5, width=10, command=self.stopTest)
self.stop.grid(row=0, column=0, padx=5, pady=5)


self.fig = plt.Figure()
self.ax1 = self.fig.add_subplot(211)
self.line0, = self.ax1.plot([], [], lw=2)
self.line1, = self.ax1.plot([], [], lw=2)
self.line2, = self.ax1.plot([], [], lw=2)
self.line3, = self.ax1.plot([], [], lw=2)
self.ax2 = self.fig.add_subplot(212)
self.line4, = self.ax2.plot([], [], lw=2)
self.line5, = self.ax2.plot([], [], lw=2)
self.line6, = self.ax2.plot([], [], lw=2)
self.line7, = self.ax2.plot([], [], lw=2)
self.canvas = FigureCanvasTkAgg(self.fig,master=master)
self.canvas.show()
self.canvas.get_tk_widget().grid(row=0, column=4, padx=20, pady=20)
frame.grid(row=0, column=0, padx=20, pady=20)

def getData(self):
if self.ani is None:
self.k = 0
self.arduinoData.flushInput()
self.arduinoData.write("<L>")
return self.start()
else:
self.arduinoData.write("<L>")
self.arduinoData.flushInput()
self.ani.event_source.start()
self.running = not self.running

def stopTest(self):
self.arduinoData.write("<H>")
if self.running:
self.ani.event_source.stop()
self.running = not self.running

def start(self):
self.xdata = []
self.pressure1 = []
self.displacement1 = []
self.cycle1 = []
self.pressure2 = []
self.displacement2 = []
self.cycle2 = []
self.pressure3 = []
self.displacement3 = []
self.cycle3 = []
self.pressure4 = []
self.displacement4 = []
self.cycle4 = []
self.k = 0
self.limit = 300
self.arduinoData.flushInput()
self.ani = animation.FuncAnimation(
self.fig,
self.update_graph,
interval=1,
repeat=True)
self.arduinoData.write("<L>")
self.running = True
self.ani._start()

def update_graph(self, i):
if (self.arduinoData.inWaiting()>0):
self.xdata.append(self.k)
x = self.arduinoData.readline()
self.setData(x)
strip_data = x.strip()
split_data = x.split("|")
actuator1 = split_data[0].split(".")
actuator2 = split_data[1].split(".")
actuator3 = split_data[2].split(".")
actuator4 = split_data[3].split(".")
self.pressure1.append(int(actuator1[0]))
self.displacement1.append(int(actuator1[1]))
self.cycle1 = int(actuator1[2])
self.pressure2.append(int(actuator2[0]))
self.displacement2.append(int(actuator2[1]))
self.cycle2 = int(actuator2[2])
self.pressure3.append(int(actuator3[0]))
self.displacement3.append(int(actuator3[1]))
self.cycle3 = int(actuator3[2])
self.pressure4.append(int(actuator4[0]))
self.displacement4.append(int(actuator4[1]))
self.cycle4 = int(actuator4[2])
self.line0.set_data(self.xdata, self.pressure1)
self.line1.set_data(self.xdata, self.pressure2)
self.line2.set_data(self.xdata, self.pressure3)
self.line3.set_data(self.xdata, self.pressure4)
self.line4.set_data(self.xdata, self.displacement1)
self.line5.set_data(self.xdata, self.displacement2)
self.line6.set_data(self.xdata, self.displacement3)
self.line7.set_data(self.xdata, self.displacement4)
if self.k < 49:
self.ax1.set_ylim(min(self.pressure1)-1, max(self.pressure4) + 1)
self.ax1.set_xlim(0, self.k+1)
self.ax2.set_ylim(min(self.displacement1)-1, max(self.displacement4) + 1)
self.ax2.set_xlim(0, self.k+1)
elif self.k >= 49:
self.ax1.set_ylim(min(self.pressure1[self.k-49:self.k])-1, max(self.pressure4[self.k-49:self.k]) + 1)
self.ax1.set_xlim(self.xdata[self.k-49], self.xdata[self.k-1])
self.ax2.set_ylim(min(self.displacement1[self.k-49:self.k])-1, max(self.displacement4[self.k-49:self.k]) + 1)
self.ax2.set_xlim(self.xdata[self.k-49], self.xdata[self.k-1])
if self.cycle1 >= self.limit:
self.running = False
self.ani = None
self.ani.event_source.stop()
self.running = not self.running
self.k += 1



root = Tkinter.Tk()
app = App(root)
root.mainloop()

如果有人可以帮助我理解多处理或提供一个例子,那就太好了。谢谢

最佳答案

如果您提供计时器,您就可以拥有一个为您管理事件的功能。

例如:

import time
import tkinter as tk

root = tk.Tk()

def getData():
print("getData")

def update_graph():
print("update_graph")

def timed_events():
getData()
update_graph()
root.after(1000, timed_events) # you can change the number to anything you need

timed_events()

root.mainloop()

这将每隔 1 秒按顺序调用每个函数并打印:

getData
update_graph

这只是一个简单的示例,旨在说明如何在计时器上进行数据处理。

更新:

这是您的代码的另一个可能选项。在 start(self): 方法中,在末尾添加一条 if 语句以重复该方法 if self.x == True:。然后您需要做的就是创建一个按钮来切换变量 self.x,以便您可以在需要时停止循环。

self.x == True

def start(self):
# all your code so far
if self.x == True:
self.start

关于python - 使用多处理/线程通过 Tkinter 读取串行端口和实时图形数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44574223/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com