gpt4 book ai didi

qt - PyQt:带有 QRunnables 的 QThreadPool 需要时间退出

转载 作者:行者123 更新时间:2023-12-04 13:16:07 24 4
gpt4 key购买 nike

我有一个创建 QRunnables 并在 QThreadPool 实例中启动它们的类。

我的线程运行良好,但如果用户想退出应用程序,应用程序需要很长时间才能停止。当然是因为发起的请求需要时间。

这是我如何使用 QThreadPool、QRunnables 的片段代码:

import sys
from PyQt5.Qt import QThreadPool, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel
from PyQt5.Qt import QRunnable


class BackendQRunnable(QRunnable):
"""
Class who create a QThread to trigger requests
"""

def __init__(self, task):
super(BackendQRunnable, self).__init__()
self.task = task

def run(self):
"""
Run the QRunnable. Trigger actions depending on the selected task

"""

# Here I make long requests
if 'user' in self.task:
self.query_user_data()
elif 'host' in self.task:
self.query_hosts_data()
elif 'service' in self.task:
self.query_services_data()
elif 'alignakdaemon' in self.task:
self.query_daemons_data()
elif 'livesynthesis' in self.task:
self.query_livesynthesis_data()
elif 'history' in self.task:
self.query_history_data()
elif 'notifications' in self.task:
self.query_notifications_data()
else:
pass

@staticmethod
def query_user_data():
"""
Launch request for "user" endpoint

"""

print('Query user data')

@staticmethod
def query_hosts_data():
"""
Launch request for "host" endpoint

"""

print('Query hosts')

@staticmethod
def query_services_data():
"""
Launch request for "service" endpoint

"""

print("Query services")

@staticmethod
def query_daemons_data():
"""
Launch request for "alignakdaemon" endpoint

"""

print('Query daemons')

@staticmethod
def query_livesynthesis_data():
"""
Launch request for "livesynthesis" endpoint

"""

print('query livesynthesis')

@staticmethod
def query_history_data():
"""
Launch request for "history" endpoint but only for hosts in "data_manager"

"""

print('Query history')

@staticmethod
def query_notifications_data():
"""
Launch request for "history" endpoint but only for notifications of current user

"""

print('Query notifications')


class ThreadManager(QObject):
"""
Class who create BackendQRunnable to periodically request on a Backend
"""

def __init__(self, parent=None):
super(ThreadManager, self).__init__(parent)
self.backend_thread = BackendQRunnable(self)
self.pool = QThreadPool.globalInstance()
self.tasks = self.get_tasks()

def start(self):
"""
Start ThreadManager

"""

print("Start backend Manager...")

# Make a first request
self.create_tasks()

# Then request periodically
timer = QTimer(self)
timer.setInterval(10000)
timer.start()
timer.timeout.connect(self.create_tasks)

@staticmethod
def get_tasks():
"""
Return the tasks to run in BackendQRunnable

:return: tasks to run
:rtype: list
"""

return [
'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
]

def create_tasks(self):
"""
Create tasks to run

"""

for cur_task in self.tasks:
backend_thread = BackendQRunnable(cur_task)

# Add task to QThreadPool
self.pool.start(backend_thread)

def exit_pool(self):
"""
Exit all BackendQRunnables and delete QThreadPool

"""

# When trying to quit, the application takes a long time to stop
self.pool.globalInstance().waitForDone()
self.pool.deleteLater()

sys.exit(0)


if __name__ == '__main__':
app = QApplication(sys.argv)

thread_manager = ThreadManager()
thread_manager.start()

layout = QVBoxLayout()

label = QLabel("Start")
button = QPushButton("DANGER!")
button.pressed.connect(thread_manager.exit_pool)

layout.addWidget(label)
layout.addWidget(button)

w = QWidget()

w.setLayout(layout)
w.show()

sys.exit(app.exec_())

在函数中 exit_pool ,我等到线程完成并删除 QThreadPool 实例...

有没有办法不等待每个线程并直接停止一切?

编辑解决方案:

所以我以不同的方式接近这个主题。我更换了我的 QRunnableQThread .我删除了 QThreadPool我自己在列表中管理线程。我还添加了一个 pyqtSignal为了停止 QTimer 并通过 quit() 关闭正在运行的线程功能。

就像那样,我的所有线程都毫无问题地退出了。
import sys
from PyQt5.Qt import QThread, QApplication, QWidget, QVBoxLayout
from PyQt5.Qt import QTimer, QObject, QPushButton, QLabel, pyqtSignal


class BackendQThread(QThread):
"""
Class who create a QThread to trigger requests
"""

quit_thread = pyqtSignal(name='close_thread')

def __init__(self, task):
super(BackendQThread, self).__init__()
self.task = task

def run(self):
"""
Run the actions depending on the selected task

"""

# Here I make long requests
if 'user' in self.task:
self.query_user_data()
elif 'host' in self.task:
self.query_hosts_data()
elif 'service' in self.task:
self.query_services_data()
elif 'alignakdaemon' in self.task:
self.query_daemons_data()
elif 'livesynthesis' in self.task:
self.query_livesynthesis_data()
elif 'history' in self.task:
self.query_history_data()
elif 'notifications' in self.task:
self.query_notifications_data()
else:
pass

@staticmethod
def query_user_data():
"""
Launch request for "user" endpoint

"""

print('Query user data')

@staticmethod
def query_hosts_data():
"""
Launch request for "host" endpoint

"""

print('Query hosts')

@staticmethod
def query_services_data():
"""
Launch request for "service" endpoint

"""

print("Query services")

@staticmethod
def query_daemons_data():
"""
Launch request for "alignakdaemon" endpoint

"""

print('Query daemons')

@staticmethod
def query_livesynthesis_data():
"""
Launch request for "livesynthesis" endpoint

"""

print('query livesynthesis')

@staticmethod
def query_history_data():
"""
Launch request for "history" endpoint but only for hosts in "data_manager"

"""

print('Query history')

@staticmethod
def query_notifications_data():
"""
Launch request for "history" endpoint but only for notifications of current user

"""

print('Query notifications')


class ThreadManager(QObject):
"""
Class who create BackendQThread to periodically request on a Backend
"""

def __init__(self, parent=None):
super(ThreadManager, self).__init__(parent)
self.tasks = self.get_tasks()
self.timer = QTimer()
self.threads = []

def start(self):
"""
Start ThreadManager

"""

print("Start backend Manager...")

# Make a first request
self.create_tasks()

# Then request periodically
self.timer.setInterval(10000)
self.timer.start()
self.timer.timeout.connect(self.create_tasks)

@staticmethod
def get_tasks():
"""
Return the available tasks to run

:return: tasks to run
:rtype: list
"""

return [
'notifications', 'livesynthesis', 'alignakdaemon', 'history', 'service', 'host', 'user',
]

def create_tasks(self):
"""
Create tasks to run

"""

# Here I reset the list of threads
self.threads = []
for cur_task in self.tasks:
backend_thread = BackendQThread(cur_task)

# Add task to QThreadPool
backend_thread.start()
self.threads.append(backend_thread)

def stop(self):
"""
Stop the manager and close all QThreads

"""

print("Stop tasks")
self.timer.stop()
for task in self.threads:
task.quit_thread.emit()

print("Tasks finished")


if __name__ == '__main__':
app = QApplication(sys.argv)

layout = QVBoxLayout()
widget = QWidget()
widget.setLayout(layout)

thread_manager = ThreadManager()

start_btn = QPushButton("Start")
start_btn.clicked.connect(thread_manager.start)
layout.addWidget(start_btn)

stop_btn = QPushButton("Stop")
stop_btn.clicked.connect(thread_manager.stop)
layout.addWidget(stop_btn)

widget.show()

sys.exit(app.exec_())

最佳答案

您无法停止 QRunnable一旦开始。但是,您可以执行一些简单的操作来减少示例中的等待时间。

首先,您可以停止计时器,使其不再添加任何任务。其次,您可以clear the thread-pool以便它删除任何挂起的任务。第三,你可以试试setting a smaller maximum thread count看看它是否仍然达到可接受的性能。默认情况下,线程池将使用 QThread.idealThreadCount()设置最大线程数 - 这通常意味着系统上的每个处理器核心一个。

最后一个选择是提供一种方法来中断在您的可运行程序中执行的代码。只有当代码运行一个循环,该循环可以定期检查一个标志以查看它是否应该继续时,这才真正成为可能。在您的示例中,您似乎可以为标志使用单个共享类属性,因为所有任务都调用静态方法。但是,如果代码不能以这种方式中断,则您无能为力 - 您只需等待当前正在运行的任务完成即可。

关于qt - PyQt:带有 QRunnables 的 QThreadPool 需要时间退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46511239/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com