gpt4 book ai didi

multithreading - 如何在wxPython GUI应用程序中实现线程

转载 作者:行者123 更新时间:2023-12-03 13:14:10 25 4
gpt4 key购买 nike

我无法正确实现线程,以防止我的应用程序锁定并发生怪异行为。该应用程序旨在登录到基于ubuntu的服务器或ubuntu嵌入式服务器,并搜索可能没有问题的日志文件。嵌入式服务器可以工作,但是在进行搜索时,该应用程序仍会锁定。站点服务器将不会处理。我尚未编写本地文件搜索代码。一旦弄清楚如何实现线程,我想添加一个进度条。自从我学习和使用Python几个月以来,我认为这将是直截了当的,但是使用GUI却存在挑战。我仍然是新手,对所有批评持开放态度。它只会帮助我成为一个更好的程序员。任何帮助是极大的赞赏。这是下面的代码:

#!c:\python27

import wx
import os
import re
import paramiko
import string
import fileinput
import os.path
import dircache
import sys
import time
import datetime, time
import wx

from wxGui import *



class MyApp(wx.App):
def OnInit(self):
frame = MyFrame("SecureTool v2.0.0", (50, 60), (458, 332))
frame.Show()
self.SetTopWindow(frame)
return True



class MyFrame(wx.Frame):
def __init__(self, title, pos, size):
wx.Frame.__init__(self, None, -1, title, pos, size)

toolbar = self.CreateToolBar()
toolbar.Realize()
menuFile = wx.Menu()
menuFile.Append(1, "&About...")
menuFile.AppendSeparator()
menuFile.Append(2, "E&xit")
menuBar = wx.MenuBar()
menuBar.Append(menuFile, "&File")
menu2 = wx.Menu()
menu2.Append(wx.NewId(), "&Copy", "Copy in status bar")
menu2.AppendSeparator()
menu2.Append(wx.NewId(), "C&ut", "")
menu2.AppendSeparator()
menu2.Append(wx.NewId(), "Paste", "")
menu2.AppendSeparator()
menu2.Append(wx.NewId(), "&Options...", "Display Options")
menuBar.Append(menu2, "&Edit")

self.SetMenuBar(menuBar)
self.CreateStatusBar()
self.SetStatusText("Welcome to SecureTool!")
self.Bind(wx.EVT_MENU, self.OnAbout, id=1)
self.Bind(wx.EVT_MENU, self.OnQuit, id=2)
panel = wx.Panel(self)
panel.SetBackgroundColour('LIGHT GREY')
#Close button
button = wx.Button(panel, label="EXIT", pos=(229, 160), size=(229, 80))
self.Bind(wx.EVT_BUTTON, self.OnQuit, button)
self.Bind(wx.EVT_CLOSE, self.OnCloseWindow)
#Embed Server button
button2 = wx.Button(panel, label="Embed Server", pos=(0, 160), size=(229, 80))
self.Bind(wx.EVT_BUTTON, self.OnIP, button2)
#Site Server
button3 = wx.Button(panel, label="SITESERVER", pos=(0, 80), size=(229, 80))
self.Bind(wx.EVT_BUTTON, self.OnUsrPswd, button3)
#Local Search
button4 = wx.Button(panel, label="LOCAL SEARCH", pos=(229, 80), size=(229, 80))
self.Bind(wx.EVT_BUTTON, self.OnOpen, button4)

EVT_RESULT(self, self.OnResult)
self.worker = None

def OnIP(self, event):
ip_address = 0
result = ''
dlg = wx.TextEntryDialog(None, "Enter the IP Address.",
'Embed Server Connect', 'xxx.xxx.xxx.xxx')
if dlg.ShowModal() == wx.ID_OK:
ip_address = dlg.GetValue()
if ip_address:
cmsg = wx.MessageDialog(None, 'Do you want to connect to: ' + ip_address,
'Connect', wx.YES_NO | wx.ICON_QUESTION)
result = cmsg.ShowModal()

if result == wx.ID_YES:
self.DispConnect(ip_address)

cmsg.Destroy()
dlg.Destroy()
return True

def OnUsrPswd(self, event):
passwrd = 0
result = ''
result = wx.TextEntryDialog(None, 'Enter Weekly Password', 'Site Server login','')
if result.ShowModal() == wx.ID_OK:
passwrd = result.GetValue()
if passwrd:
psmsg = wx.MessageDialog(None, 'Do you want to connect to the Siteserver?', 'Connect',
wx.YES_NO | wx.ICON_QUESTION)
result = psmsg.ShowModal()
if result == wx.ID_YES:
self.SiteserverConnect(passwrd)

psmsg.Destroy()
result.Destroy()
return True

def ErrMsg(self):
ermsg = wx.MessageDialog(None, 'Invalid Entry!', 'ConnectionDialog', wx.ICON_ERROR)
ermsg.ShowModal()
ermsg.Destroy()

def GoodConnect(self):
gdcnt = wx.MessageDialog(None, 'You are connected!', 'ConnectionStatus', wx.ICON_INFORMATION)
gdcnt.ShowModal()
#if gdcnt.ShowModal() == wx.ID_OK:
gdcnt.Destroy()

def OnFinish(self):
finish = wx.MessageDialog(None, 'Job is finished!', 'WorkStatus', wx.ICON_INFORMATION)
finish.ShowModal()
finish.Destroy()


def DispConnect(self, address):
pattern = r"\b(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
port = 22
user = 'root'
password ='******'
if re.match(pattern, address):
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(address,port,user,password)
Ssh = ssh
self.GoodConnect()
self.OnSearch(Ssh)
else:
self.ErrMsg()

def SiteserverConnect(self, password):
port = 22
user = 'root2'
address = '10.5.48.2'
if password:
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(address,port,user,password)
Ssh = ssh
self.GoodConnect()
self.OnSiteSearch(Ssh)
else:
self.ErrMsg()

def startWorker(self,a, b, c):
self.button2.Disable()
self.thread = Thread(target=self.LongRunningSearch)
self.thread.start()

def OnSearch(self, sssh):
self.startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])
self.OnFinish()

def LongRunningSearch(sssh):
ssh = sssh
apath = '/'
apattern = '"*.txt" -o -name "*.log"'
rawcommand = 'find {path} -name "*.txt" -o -name "*.log"'
command1 = rawcommand.format(path=apath, pattern=apattern)
stdin, stdout, stderr = ssh.exec_command(command1)
filelist = stdout.read().splitlines()
ftp = ssh.open_sftp()
for afile in filelist:
(head, filename) = os.path.split(afile)

paths = '/dispenser_result.log'
temp = ftp.file(paths, 'w')
from time import strftime
temp.write("{0:^75}".format("Company -Security Report" ) + strftime(" %Y-%m-%d %H:%M:%S") + "\n\n")
ustring = wx.TextEntryDialog(None, 'Enter a search string below:', 'Search', 'String Name')
if ustring.ShowModal() == wx.ID_OK:
userstring = ustring.GetValue()

if userstring:
userStrHEX = userstring.encode('hex')
userStrASCII = ''.join(str(ord(char)) for char in userstring)
regex = re.compile(r"(%s|%s|%s)" % ( re.escape( userstring ), re.escape( userStrHEX ), re.escape( userStrASCII )))
else:
sys.exit('You Must Enter A String!!!')

count = 0
for afile in filelist:
(head, filename) = os.path.split(afile)
if afile.endswith(".log") or afile.endswith(".txt"):
f=ftp.open(afile, 'r')
for i, line in enumerate(f.readlines()):
result = regex.search(line)
if result:
count += 1
ln = str(i)
pathname = os.path.join(afile)
template = "\n\nLine: {0}\nFile: {1}\nString Type: {2}\n\n"
output = template.format(ln, pathname, result.group())
ftp.get(afile, 'c:\\Extracted\\' + filename)
temp.write(output)
break
else:
#print "No Match in: " + os.path.join(afile)
temp.write("\nNo Match in: " + os.path.join(afile))
f.close()

for fnum in filelist:
#print "\nFiles Searched: ", len(filelist)
#print "Files Matched: ", count
num = len(filelist)

temp.write("\n\nFiles Searched:" + '%s\n' % (num))
temp.write("Files Matched:"+ '%s\n' % (count))
temp.write("Search String:"+ '%s\n' % (userstring))
break
temp.close()
defaultFolder = "DispenserLogResults"
if not defaultFolder.endswith(':') and not os.path.exists('c:\\Extracted\\DispenserLogResults'):
os.mkdir('c:\\Extracted\\DispenserLogResults')
else:
pass
ftp.get(paths, 'c:\\Extracted\\DispenserLogResults\\dispenser_result.log')

ftp.remove(paths)

re.purge()
ftp.close()
ssh.close()

def OnSiteSearch(self, sssh):
ssh = sssh
apath = '/var/log/apache2 /var/opt/smartmerch/log/'
apattern = '"*.log"'
rawcommand = 'find {path} -type f -name "*.log"'
command1 = rawcommand.format(path=apath, pattern=apattern)
stdin, stdout, stderr = ssh.exec_command(command1)
filelist = stdout.read().splitlines()
ftp = ssh.open_sftp()
for afile in filelist:
(head, filename) = os.path.split(afile)

paths = '/var/tmp/siteserver_result.log'
temp = ftp.file(paths, 'w')
from time import strftime
temp.write("{0:^75}".format("Gilbarco - SQA Security Report" ) + strftime(" %Y-%m-%d %H:%M:%S") + "\n\n")
temp.write("\n{0:^75}".format("SiteServer Logs" ))
ustring = wx.TextEntryDialog(None, 'Enter a search string below:', 'Search', 'String Name')
if ustring.ShowModal() == wx.ID_OK:
userstring = ustring.GetValue()

if userstring:
userStrHEX = userstring.encode('hex')
userStrASCII = ''.join(str(ord(char)) for char in userstring)
regex = re.compile(r"(%s|%s|%s)" % ( re.escape( userstring ), re.escape( userStrHEX ), re.escape( userStrASCII )))
else:
sys.exit('You Must Enter A String!!!')

count = 0
for afile in filelist:
(head, filename) = os.path.split(afile)
if afile.endswith(".log") or afile.endswith(".txt"):
f=ftp.open(afile, 'r')
for i, line in enumerate(f.readlines()):
result = regex.search(line)
if result:
count += 1
ln = str(i)
pathname = os.path.join(afile)
template = "\n\nLine: {0}\nFile: {1}\nString Type: {2}\n\n"
output = template.format(ln, pathname, result.group())
ftp.get(afile, 'c:\\Extracted\\' + filename)
temp.write(output)
break
else:
temp.write("\nNo Match in: " + os.path.join(afile))
f.close()

for fnum in filelist:
num = len(filelist)

temp.write("\n\nFiles Searched:" + '%s\n' % (num))
temp.write("Files Matched:"+ '%s\n' % (count))
temp.write("Search String:"+ '%s\n' % (userstring))
break
temp.close()
defaultFolder = "SiteServerLogResults"
if not defaultFolder.endswith(':') and not os.path.exists('c:\\Extracted\\SiteServerLogResults'):
os.mkdir('c:\\Extracted\\SiteServerLogResults')
else:
pass
ftp.get(paths, 'c:\\Extracted\\SiteServerLogResults\\siteserver_result.log')

ftp.remove(paths)

re.purge()
ftp.close()
ssh.close()
self.OnFinish()

def OnOpen(self,e):
self.dirname = ''
dlg = wx.FileDialog(self, "Choose a file", self.dirname, "", "*.*", wx.OPEN)
if dlg.ShowModal() == wx.ID_OK:
self.filename = dlg.GetFilename()
self.dirname = dlg.GetDirectory()
f = open(os.path.join(self.dirname, self.filename), 'r')
self.control.SetValue(f.read())
f.close()
dlg.Destroy()

def OnQuit(self, event):
self.Close(True)

def OnAbout(self, event):
wx.MessageBox("This is sQAST v2.0.0",
"About secureTool", wx.OK | wx.ICON_INFORMATION, self)



def OnCloseWindow(self, event):
self.Destroy()

if __name__ == '__main__':
app = MyApp(False)
app.MainLoop()

运行后出现回溯错误:
Traceback (most recent call last):
File "C:\SQA_log\wxGui.py", line 87, in OnIP
self.DispConnect(ip_address)
File "C:\SQA_log\wxGui.py", line 143, in DispConnect
self.OnSearch(Ssh)
File "C:\SQA_log\wxGui.py", line 169, in OnSearch
self.startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])

最佳答案

在您的特定情况下,我会做类似的事情:

1)封装长任务并将其与GUI react 分开,简化您的方法:

def OnSearch(self, sssh):
self.LongRunningSearch(sssh) # Move all the blocking code here,
# just NOT the GUI reaction !
# Meaning self.OnFinish()...
self.OnFinish()

2)确认它仍然可以正常运行。然后修改方法以添加线程:
def OnSearch(self, sssh):
startWorker(self.OnFinish, self.LongRunningSearch, wargs=[sssh])
self.OnSearch将立即结束,并且在运行 self.OnFinish的线程完成后将调用 self.LongRunningSearch。由于我无法在计算机上运行您的代码,因此可能仍需要进行一些调整。

关于multithreading - 如何在wxPython GUI应用程序中实现线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7666368/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com