gpt4 book ai didi

python - pyqt中的异步模式?或者更干净的后台调用模式?

转载 作者:太空狗 更新时间:2023-10-29 17:42:19 53 4
gpt4 key购买 nike

我正在尝试编写一个简短的(一个文件 pyqt)程序,它是响应式的(所以 python/lxml/qt 之外的依赖项,尤其是那些我不能只留在文件中的依赖项对于这个用例有一些缺点,但我可能仍然愿意尝试它们)。我正在尝试在工作线程上执行可能冗长(且可取消)的操作(实际上后台操作周围有一个锁以防止一次执行多个操作(因为它使用的库一次只能使用一个调用)和超时,因此产生多个线程也可以。

据我所知,使用 qt 执行此操作的“基本”方法是。(注意代码没有经过测试所以可能是错误的)

class MainWindow(QWidget):
#self.worker moved to background thread
def initUI(self):
...
self.cmd_button.clicked.connect(self.send)
...

@pyqtslot()
def send(self):
...
...#get cmd from gui
QtCore.QTimer.singleShot(0, lambda : self.worker(cmd))


@pyqtslot(str)
def end_send(self, result):
...
...# set some gui to display result
...



class WorkerObject(QObject):
def send_cmd(self, cmd):
... get result of cmd
QtCore.QTimer.singleShot(0, lambda: self.main_window.end_send())

(我是否正确使用了 QTimer(它在不同的线程上运行,对吧)?)

我真的更喜欢按照 c# 的异步方式使用更简单、更抽象的东西。(注意我没有使用 asyncio,所以我可能会弄错一些东西)

class MainWindow(QWidget):
...
@asyncio.coroutine
def send(self):
...
...#get cmd from gui
result = yield from self.worker(cmd)
#set gui textbox to result

class WorkerObject(QObject):
@asyncio.coroutine
def send_cmd(self, cmd):
... get result of cmd
yield from loop.run_in_executor(None, self.model.send_command, cmd)

我听说 python 3 具有类似的功能,并且有一个后向端口,但它是否可以与 qt 一起正常工作?

如果有人知道另一种更明智的模式。这也将是有用的/可接受的答案。

最佳答案

对您的问题(“有没有办法在 PyQt 中使用类似 asyncio 的模式?”)的简短回答是肯定的,但它非常复杂,对于一个小程序来说可能不值得.下面是一些原型(prototype)代码,允许您使用您描述的异步模式:

import types
import weakref
from functools import partial

from PyQt4 import QtGui
from PyQt4 import QtCore
from PyQt4.QtCore import QThread, QTimer

## The following code is borrowed from here:
# http://stackoverflow.com/questions/24689800/async-like-pattern-in-pyqt-or-cleaner-background-call-pattern
# It provides a child->parent thread-communication mechanism.
class ref(object):
"""
A weak method implementation
"""
def __init__(self, method):
try:
if method.im_self is not None:
# bound method
self._obj = weakref.ref(method.im_self)
else:
# unbound method
self._obj = None
self._func = method.im_func
self._class = method.im_class
except AttributeError:
# not a method
self._obj = None
self._func = method
self._class = None

def __call__(self):
"""
Return a new bound-method like the original, or the
original function if refers just to a function or unbound
method.
Returns None if the original object doesn't exist
"""
if self.is_dead():
return None
if self._obj is not None:
# we have an instance: return a bound method
return types.MethodType(self._func, self._obj(), self._class)
else:
# we don't have an instance: return just the function
return self._func

def is_dead(self):
"""
Returns True if the referenced callable was a bound method and
the instance no longer exists. Otherwise, return False.
"""
return self._obj is not None and self._obj() is None

def __eq__(self, other):
try:
return type(self) is type(other) and self() == other()
except:
return False

def __ne__(self, other):
return not self == other

class proxy(ref):
"""
Exactly like ref, but calling it will cause the referent method to
be called with the same arguments. If the referent's object no longer lives,
ReferenceError is raised.

If quiet is True, then a ReferenceError is not raise and the callback
silently fails if it is no longer valid.
"""

def __init__(self, method, quiet=False):
super(proxy, self).__init__(method)
self._quiet = quiet

def __call__(self, *args, **kwargs):
func = ref.__call__(self)
if func is None:
if self._quiet:
return
else:
raise ReferenceError('object is dead')
else:
return func(*args, **kwargs)

def __eq__(self, other):
try:
func1 = ref.__call__(self)
func2 = ref.__call__(other)
return type(self) == type(other) and func1 == func2
except:
return False

class CallbackEvent(QtCore.QEvent):
"""
A custom QEvent that contains a callback reference

Also provides class methods for conveniently executing
arbitrary callback, to be dispatched to the event loop.
"""
EVENT_TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())

def __init__(self, func, *args, **kwargs):
super(CallbackEvent, self).__init__(self.EVENT_TYPE)
self.func = func
self.args = args
self.kwargs = kwargs

def callback(self):
"""
Convenience method to run the callable.

Equivalent to:
self.func(*self.args, **self.kwargs)
"""
self.func(*self.args, **self.kwargs)

@classmethod
def post_to(cls, receiver, func, *args, **kwargs):
"""
Post a callable to be delivered to a specific
receiver as a CallbackEvent.

It is the responsibility of this receiver to
handle the event and choose to call the callback.
"""
# We can create a weak proxy reference to the
# callback so that if the object associated with
# a bound method is deleted, it won't call a dead method
if not isinstance(func, proxy):
reference = proxy(func, quiet=True)
else:
reference = func
event = cls(reference, *args, **kwargs)

# post the event to the given receiver
QtGui.QApplication.postEvent(receiver, event)

## End borrowed code

## Begin Coroutine-framework code

class AsyncTask(QtCore.QObject):
""" Object used to manage asynchronous tasks.

This object should wrap any function that you want
to call asynchronously. It will launch the function
in a new thread, and register a listener so that
`on_finished` is called when the thread is complete.

"""
def __init__(self, func, *args, **kwargs):
super(AsyncTask, self).__init__()
self.result = None # Used for the result of the thread.
self.func = func
self.args = args
self.kwargs = kwargs
self.finished = False
self.finished_cb_ran = False
self.finished_callback = None
self.objThread = RunThreadCallback(self, self.func, self.on_finished,
*self.args, **self.kwargs)
self.objThread.start()

def customEvent(self, event):
event.callback()

def on_finished(self, result):
""" Called when the threaded operation is complete.

Saves the result of the thread, and
executes finished_callback with the result if one
exists. Also closes/cleans up the thread.

"""
self.finished = True
self.result = result
if self.finished_callback:
self.finished_ran = True
func = partial(self.finished_callback, result)
QTimer.singleShot(0, func)
self.objThread.quit()
self.objThread.wait()

class RunThreadCallback(QtCore.QThread):
""" Runs a function in a thread, and alerts the parent when done.

Uses a custom QEvent to alert the main thread of completion.

"""
def __init__(self, parent, func, on_finish, *args, **kwargs):
super(RunThreadCallback, self).__init__(parent)
self.on_finished = on_finish
self.func = func
self.args = args
self.kwargs = kwargs

def run(self):
try:
result = self.func(*self.args, **self.kwargs)
except Exception as e:
print "e is %s" % e
result = e
finally:
CallbackEvent.post_to(self.parent(), self.on_finished, result)


def coroutine(func):
""" Coroutine decorator, meant for use with AsyncTask.

This decorator must be used on any function that uses
the `yield AsyncTask(...)` pattern. It shouldn't be used
in any other case.

The decorator will yield AsyncTask objects from the
decorated generator function, and register itself to
be called when the task is complete. It will also
excplicitly call itself if the task is already
complete when it yields it.

"""
def wrapper(*args, **kwargs):
def execute(gen, input=None):
if isinstance(gen, types.GeneratorType):
if not input:
obj = next(gen)
else:
try:
obj = gen.send(input)
except StopIteration as e:
result = getattr(e, "value", None)
return result
if isinstance(obj, AsyncTask):
# Tell the thread to call `execute` when its done
# using the current generator object.
func = partial(execute, gen)
obj.finished_callback = func
if obj.finished and not obj.finished_cb_ran:
obj.on_finished(obj.result)
else:
raise Exception("Using yield is only supported with AsyncTasks.")
else:
print("result is %s" % result)
return result
result = func(*args, **kwargs)
execute(result)
return wrapper

## End coroutine-framework code

如果将上面的代码放入模块中(例如 qtasync.py),您可以将其导入脚本并像这样使用它来获得类似 asyncio 的行为:

import sys
import time
from qtasync import AsyncTask, coroutine
from PyQt4 import QtGui
from PyQt4.QtCore import QThread

class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.initUI()

def initUI(self):
self.cmd_button = QtGui.QPushButton("Push", self)
self.cmd_button.clicked.connect(self.send_evt)
self.statusBar()
self.show()

def worker(self, inval):
print "in worker, received '%s'" % inval
time.sleep(2)
return "%s worked" % inval

@coroutine
def send_evt(self, arg):
out = AsyncTask(self.worker, "test string")
out2 = AsyncTask(self.worker, "another test string")
QThread.sleep(3)
print("kicked off async task, waiting for it to be done")
val = yield out
val2 = yield out2
print ("out is %s" % val)
print ("out2 is %s" % val2)
out = yield AsyncTask(self.worker, "Some other string")
print ("out is %s" % out)


if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
m = MainWindow()
sys.exit(app.exec_())

输出(按下按钮时):

in worker, received 'test string'
in worker, received 'another test string'
kicked off async task, waiting for it to be done
out is test string worked
out2 is another test string worked
in worker, received 'Some other string'
out is Some other string worked

如您所见,worker 会在通过 AsyncTask 类调用时在线程中异步运行,但其返回值可以是 yield直接从 send_evt 编辑,无需使用回调。

该代码使用 Python 生成器的协程支持功能 (generator_object.send) 和 recipe I found on ActiveState它提供了一个子线程->主线程通信机制,以实现一些非常基本的协程。协程非常有限:您不能从它们返回任何内容,也不能将协程调用链接在一起。实现这两件事可能是可能的,但也可能不值得付出努力,除非你真的需要它们。我也没有对此进行太多负面测试,因此可能无法正确处理 worker 和其他地方的异常。不过,它的好处是允许您通过 AsyncTask 类在单独的线程中调用方法,然后yield线程准备就绪时,不会阻塞 Qt 事件循环。通常这种事情会通过回调来完成,这可能很难遵循,而且通常比将所有代码都放在一个函数中更难读。

如果您可以接受我提到的限制,欢迎您使用这种方法,但这实际上只是一个概念验证;在考虑将其投入生产之前,您需要进行大量测试。

正如您所提到的,Python 3.3 和 3.4 通过分别引入 yield fromasyncio 使异步编程变得更加容易。我认为 yield from 实际上在这里非常有用,可以允许链接协程(意味着让一个协程调用另一个协程并从中获取结果)。 asyncio 没有 PyQt4 事件循环集成,因此它的用处非常有限。

另一种选择是完全放弃协程部分,只使用 callback-based inter-thread communication mechanism直接:

import sys
import time

from qtasync import CallbackEvent # No need for the coroutine stuff
from PyQt4 import QtGui
from PyQt4.QtCore import QThread

class MyThread(QThread):
""" Runs a function in a thread, and alerts the parent when done.

Uses a custom QEvent to alert the main thread of completion.

"""
def __init__(self, parent, func, on_finish, *args, **kwargs):
super(MyThread, self).__init__(parent)
self.on_finished = on_finish
self.func = func
self.args = args
self.kwargs = kwargs
self.start()

def run(self):
try:
result = self.func(*self.args, **self.kwargs)
except Exception as e:
print "e is %s" % e
result = e
finally:
CallbackEvent.post_to(self.parent(), self.on_finished, result)


class MainWindow(QtGui.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.initUI()

def initUI(self):
self.cmd_button = QtGui.QPushButton("Push", self)
self.cmd_button.clicked.connect(self.send)
self.statusBar()
self.show()

def customEvent(self, event):
event.callback()

def worker(self, inval):
print("in worker, received '%s'" % inval)
time.sleep(2)
return "%s worked" % inval

def end_send(self, cmd):
print("send returned '%s'" % cmd)

def send(self, arg):
t = MyThread(self, self.worker, self.end_send, "some val")
print("Kicked off thread")


if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
m = MainWindow()
sys.exit(app.exec_())

输出:

Kicked off thread
in worker, received 'some val'
send returned 'some val worked'

如果您要处理很长的回调链,这可能会有点笨拙,但它不依赖于未经证实的 coroutine 代码。

关于python - pyqt中的异步模式?或者更干净的后台调用模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24689800/

53 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com