gpt4 book ai didi

Python:检测父线程是否调用了exit()

转载 作者:太空宇宙 更新时间:2023-11-04 04:21:44 25 4
gpt4 key购买 nike

我有这个线程在后台运行,可以从主线程更新:

import threading
from Queue import Queue
from time import sleep

class Animation(threading.Thread):

SIGNAL_STOP = 'stop'

def __init__(self, framerate):
threading.Thread.__init__(self)
self.queue = Queue()
self.framerate = framerate

def tell(self, data):
self.queue.put(data)

def stop(self):
self.tell(Animation.SIGNAL_STOP)

def loop(self):
# Override this method to implement animation loop
pass

def update(self, data):
# Override this method to implement the state update
pass

def cleanup(self):
# Override this method to implement what's done when the animation is stopped
pass

def run(self):
while True:
if not self.queue.empty():
data = self.queue.get()
if data == Animation.SIGNAL_STOP:
break;
self.update(data)
self.loop()
sleep(1. / self.framerate)

self.cleanup()


class TestAnimation(Animation):
def __init__(self, framerate):
super(TestAnimation, self).__init__(framerate)
self.num = 0

def loop(self):
print 'num =', self.num
self.num = self.num + 1

def update(self, data):
print 'update:', data

def cleanup(self):
print 'graceful exit'

print 'start'
ta = TestAnimation(1)
ta.start()
sleep(3)
ta.update(123)
sleep(3)
#ta.stop() # I'd like the animation thread to feel that the parent wants to exit and carry out stopping itself
print 'end'
exit()

我想实现一些方法来检测父线程何时要退出,然后所有正在运行的线程都会自行终止。我更喜欢这样,而不是显式调用正在运行的线程的 stop() 方法。

最佳答案

我相信你可以将其设为 daemon thread,所以当主线程退出时,所有守护线程都会随之退出。

因此您可以在构造函数中默认为 daemon = True 或在 start 之前设置它

ta = TestAnimation(1)
ta.daemon = True
ta.start()

编辑。由于需要清理线程。

您可以使用 thread.Event 的组合和 atexit用于向守护进程线程发出信号以在退出前进行清理。

这是一个简单的例子:

import time
import threading
import atexit

signal_to_threads = threading.Event() # Global signal for threads to stop.

registered_threads = [] # Register all threads into here.

@atexit.register
def signal_threads():
print('signaling threads to stop')
signal_to_threads.set()
for thread in registered_threads:
thread.signal.wait()


class TestExit(threading.Thread):
def __init__(self, *args, **kw):
registered_threads.append(self)
self.signal = threading.Event()
super(TestExit, self).__init__(*args, **kw)
self.daemon = True # Ensure is a daemon thread.

def run(self):
while True:
time.sleep(3)
print('Hi from child thread')
if signal_to_threads.is_set():
print('stopping child thread and doing cleanup.')
# self.do_cleanup()
self.signal.set()
break


t = TestExit()
t.start()
print('sleeping for 10 secs.')
time.sleep(10)
print('exiting main thread')

演示:

python test_thread.py
sleeping for 10 secs.
Hi from child thread
Hi from child thread
Hi from child thread
exiting main thread
signaling threads to stop
Hi from child thread
stopping child thread and doing clean up.

因为 atexit 应该在退出时运行注册的函数,所以不需要跟上每个退出点来清理线程。

关于Python:检测父线程是否调用了exit(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54394188/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com