gpt4 book ai didi

python - wxpython中的线程安全

转载 作者:太空宇宙 更新时间:2023-11-03 17:45:47 25 4
gpt4 key购买 nike

我正在使用 wxpython 为分析和处理音频文件的命令行工具构建前端 GUI。文件被加载到 GUI 中;然后启动执行分析和调整步骤的线程;最后这些过程的结果显示在主窗口中。

我一直在努力编写线程安全的代码;但是,某些线程仍然任意地无法完成(应该注意的是,当我第二次手动启动它们时,它们通常会运行完成)。下面我提供了程序的删节版本,其中包含 AnalysisThread、AdjustThread 和 MainWindow 的类。主窗口中的按钮绑定(bind)到函数“OnAnalyze”和“OnAdjust”,它们创建适当线程类的实例。线程本身通过 wx.CallAfter 和 Publisher 与 GUI 进行通信。据我了解,这应该允许数据在主进程和线程之间安全地来回传递。如果有人能指出我下面的代码哪里出了问题,我将不胜感激。

如果我无法解决线程安全问题,我的后备计划是以某种方式检测线程的死亡并尝试在幕后“复活”它,而用户不知道存在故障。这看起来合理吗?如果是这样,我们将非常欢迎您提供有关如何实现这一目标的建议。

非常感谢。

#!/usr/bin/python

import wx
import time
from threading import Thread
import os, sys, re, subprocess, shutil
from wx.lib.pubsub import setuparg1
from wx.lib.pubsub import pub as Publisher


#Start a thread that analyzes audio files.
class AnalysisThread(Thread):
def __init__(self,args):
Thread.__init__(self)
self.file = args[0]
self.index = args[1]
self.setDaemon(True)
self.start()

def run(self):
proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', self.file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
flag = 0
summary = ""
while proc.poll() is None:
line = proc.stdout.readline()
if line:
endProcess = re.search(r'Summary', line)
if endProcess is not None:
flag = 1
if flag:
summary += line

wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index))

#Start a thread that adjusts audio files so that they conform to EBU loudness standards.
class AdjustThread(Thread):
def __init__(self,args):
Thread.__init__(self)
self.file = args[0]
self.index = args[1]
self.IL = args[2]
self.TP = args[3]
self.SP = args[4]
self.setDaemon(True)
self.start()

def run(self):

proc = subprocess.Popen(['ffmpeg', '-nostats', '-i', adjusted_file, '-filter_complex', 'ebur128=peak=true+sample', '-f', 'null', '-'], bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
flag = 0
summary = ""
while proc.poll() is None:
line = proc.stdout.readline()
if line:
endProcess = re.search(r'Summary', line)
if endProcess is not None:
flag = 1
if flag:
summary += line

wx.CallAfter(Publisher.sendMessage, "update", (self.file, summary, self.index))


class MainWindow(wx.Frame):

fileList = collections.OrderedDict()

def __init__(self, parent, id, title):
wx.Frame.__init__(self, parent, id, title, size=(900, 400))

Publisher.subscribe(self.UpdateDisplay, "update")

#Add "analyze" and "Adjust" buttons to the main frame.
panel = wx.Panel(self, -1)
vbox = wx.BoxSizer(wx.VERTICAL)
self.ana = wx.Button(panel, -1, 'Analyze', size=(100, -1))
self.adj = wx.Button(panel, -1, 'Adjust', size=(100, -1))
self.Bind(wx.EVT_BUTTON, self.OnAnalyze, id=self.ana.GetId())
self.Bind(wx.EVT_BUTTON, self.OnAdjust, id=self.adj.GetId())
vbox.Add(self.ana, 0, wx.ALL, 10)
vbox.Add(self.adj, 0, wx.ALL, 10)
vbox.Add(self.list, 1, wx.EXPAND | wx.TOP, 3)
vbox.Add((-1, 10))
panel.SetSizer(hbox)

self.Centre()
self.Show(True)

#This function gets called when "Analyze" is pressed.
def OnAnalyze(self, event):

for (file,index) in toAnalyze:

#Add a progess bar
item = self.list.GetItem(index,2)
gauge = item.GetWindow()
gauge.Pulse()

#Launch the analysis thread
AnalysisThread(args=(file,index,))

#This function gets called when "Adjust" is pressed.
def OnAdjust(self, event):

for (file,index) in toAdjust:
gauge = wx.Gauge(self.list,-1,range=50,size=(width,15),style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)
gauge.Pulse() #shouldn't start this right away...
item.SetWindow(gauge, wx.ALIGN_CENTRE)
self.list.SetItem(item)

#Launch the adjust thread
AdjustThread(args=(file,index,intloud,truepeak,samplepeak))

#This function is invoked by the Publisher.
def UpdateDisplay(self, msg):
t = msg.data
file = t[0]
summary = t[1]
i = t[2]
self.ProcessSummary(file, summary, i)
item = self.list.GetItem(i,2)
gauge = item.GetWindow()
gauge.SetValue(50)
self.fileList[file][1] = True

#Display information from the threads in the main frame.
def ProcessSummary(self, file, summary, i):
loudnessRange = re.search(r'LRA:\s(.+?) LU', summary)
if loudnessRange is not None:
LRA = loudnessRange.group(1)
else:
LRA = "n/a"
self.list.SetStringItem(i,7,LRA)
self.fileList[file][6] = LRA

intloud = re.search(r'I:\s(.+?) LUFS', summary)
if intloud is not None:
IL = intloud.group(1)
else:
IL = "n/a"
self.list.SetStringItem(i,4,IL)
self.fileList[file][3] = IL

truePeak = re.search(r'True peak:\s+Peak:\s(.+?) dBFS', summary)
if truePeak is not None:
TP = truePeak.group(1)
else:
TP = "n/a"
self.list.SetStringItem(i,5,TP)
self.fileList[file][4] = TP

samplePeak = re.search(r'Sample peak:\s+Peak:\s(.+?) dBFS', summary)
if samplePeak is not None:
SP = samplePeak.group(1)
else:
SP = "n/a"
self.list.SetStringItem(i,6,SP)
self.fileList[file][5] = SP


app = wx.App()
MainWindow(None, -1, 'Leveler')
app.MainLoop()

最佳答案

这里是来自 2.8 系列 wxPython 演示的延迟结果示例代码。我修改了这段代码以满足我的需要。

顺便说一句,wxPython demo 是学习 wx 的宝贵资源。我强烈推荐。通过演示,wxPython 学起来很有趣!

看来版本 3 现已正式发布,因此稳定。但如果您使用的是旧版本,您可以在此处找到演示:http://sourceforge.net/projects/wxpython/files/wxPython/

import wx
import wx.lib.delayedresult as delayedresult


class FrameSimpleDelayedBase(wx.Frame):
def __init__(self, *args, **kwds):
wx.Frame.__init__(self, *args, **kwds)
pnl = wx.Panel(self)
self.checkboxUseDelayed = wx.CheckBox(pnl, -1, "Using delayedresult")
self.buttonGet = wx.Button(pnl, -1, "Get")
self.buttonAbort = wx.Button(pnl, -1, "Abort")
self.slider = wx.Slider(pnl, -1, 0, 0, 10, size=(100,-1),
style=wx.SL_HORIZONTAL|wx.SL_AUTOTICKS)
self.textCtrlResult = wx.TextCtrl(pnl, -1, "", style=wx.TE_READONLY)

self.checkboxUseDelayed.SetValue(1)
self.checkboxUseDelayed.Enable(False)
self.buttonAbort.Enable(False)

vsizer = wx.BoxSizer(wx.VERTICAL)
hsizer = wx.BoxSizer(wx.HORIZONTAL)
vsizer.Add(self.checkboxUseDelayed, 0, wx.ALL, 10)
hsizer.Add(self.buttonGet, 0, wx.ALL, 5)
hsizer.Add(self.buttonAbort, 0, wx.ALL, 5)
hsizer.Add(self.slider, 0, wx.ALL, 5)
hsizer.Add(self.textCtrlResult, 0, wx.ALL, 5)
vsizer.Add(hsizer, 0, wx.ALL, 5)
pnl.SetSizer(vsizer)
vsizer.SetSizeHints(self)

self.Bind(wx.EVT_BUTTON, self.handleGet, self.buttonGet)
self.Bind(wx.EVT_BUTTON, self.handleAbort, self.buttonAbort)




class FrameSimpleDelayed(FrameSimpleDelayedBase):
"""This demos simplistic use of delayedresult module."""

def __init__(self, *args, **kwargs):
FrameSimpleDelayedBase.__init__(self, *args, **kwargs)
self.jobID = 0
self.abortEvent = delayedresult.AbortEvent()
self.Bind(wx.EVT_CLOSE, self.handleClose)

def setLog(self, log):
self.log = log

def handleClose(self, event):
"""Only needed because in demo, closing the window does not kill the
app, so worker thread continues and sends result to dead frame; normally
your app would exit so this would not happen."""
if self.buttonAbort.IsEnabled():
self.log( "Exiting: Aborting job %s" % self.jobID )
self.abortEvent.set()
self.Destroy()

def handleGet(self, event):
"""Compute result in separate thread, doesn't affect GUI response."""
self.buttonGet.Enable(False)
self.buttonAbort.Enable(True)
self.abortEvent.clear()
self.jobID += 1

self.log( "Starting job %s in producer thread: GUI remains responsive"
% self.jobID )
delayedresult.startWorker(self._resultConsumer, self._resultProducer,
wargs=(self.jobID,self.abortEvent), jobID=self.jobID)


def _resultProducer(self, jobID, abortEvent):
"""Pretend to be a complex worker function or something that takes
long time to run due to network access etc. GUI will freeze if this
method is not called in separate thread."""
import time
count = 0
while not abortEvent() and count < 50:
time.sleep(0.1)
count += 1
return jobID


def handleAbort(self, event):
"""Abort the result computation."""
self.log( "Aborting result for job %s" % self.jobID )
self.buttonGet.Enable(True)
self.buttonAbort.Enable(False)
self.abortEvent.set()


def _resultConsumer(self, delayedResult):
jobID = delayedResult.getJobID()
assert jobID == self.jobID
try:
result = delayedResult.get()
except Exception, exc:
self.log( "Result for job %s raised exception: %s" % (jobID, exc) )
return

# output result
self.log( "Got result for job %s: %s" % (jobID, result) )
self.textCtrlResult.SetValue(str(result))

# get ready for next job:
self.buttonGet.Enable(True)
self.buttonAbort.Enable(False)


class FrameSimpleDirect(FrameSimpleDelayedBase):
"""This does not use delayedresult so the GUI will freeze while
the GET is taking place."""

def __init__(self, *args, **kwargs):
self.jobID = 1
FrameSimpleDelayedBase.__init__(self, *args, **kwargs)
self.checkboxUseDelayed.SetValue(False)

def setLog(self, log):
self.log = log

def handleGet(self, event):
"""Use delayedresult, this will compute result in separate
thread, and will affect GUI response because a thread is not
used."""
self.buttonGet.Enable(False)
self.buttonAbort.Enable(True)

self.log( "Doing job %s without delayedresult (same as GUI thread): GUI hangs (for a while)" % self.jobID )
result = self._resultProducer(self.jobID)
self._resultConsumer( result )

def _resultProducer(self, jobID):
"""Pretend to be a complex worker function or something that takes
long time to run due to network access etc. GUI will freeze if this
method is not called in separate thread."""
import time
time.sleep(5)
return jobID

def handleAbort(self, event):
"""can never be called"""
pass

def _resultConsumer(self, result):
# output result
self.log( "Got result for job %s: %s" % (self.jobID, result) )
self.textCtrlResult.SetValue(str(result))

# get ready for next job:
self.buttonGet.Enable(True)
self.buttonAbort.Enable(False)
self.jobID += 1


#---------------------------------------------------------------------------
#---------------------------------------------------------------------------

class TestPanel(wx.Panel):
def __init__(self, parent, log):
self.log = log
wx.Panel.__init__(self, parent, -1)

vsizer = wx.BoxSizer(wx.VERTICAL)
b = wx.Button(self, -1, "Long-running function in separate thread")
vsizer.Add(b, 0, wx.ALL, 5)
self.Bind(wx.EVT_BUTTON, self.OnButton1, b)

b = wx.Button(self, -1, "Long-running function in GUI thread")
vsizer.Add(b, 0, wx.ALL, 5)
self.Bind(wx.EVT_BUTTON, self.OnButton2, b)

bdr = wx.BoxSizer()
bdr.Add(vsizer, 0, wx.ALL, 50)
self.SetSizer(bdr)
self.Layout()

def OnButton1(self, evt):
frame = FrameSimpleDelayed(self, title="Long-running function in separate thread")
frame.setLog(self.log.WriteText)
frame.Show()

def OnButton2(self, evt):
frame = FrameSimpleDirect(self, title="Long-running function in GUI thread")
frame.setLog(self.log.WriteText)
frame.Show()

关于python - wxpython中的线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29821256/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com