gpt4 book ai didi

python - 如何正确使用倒计时线程,如何过早停止它?

转载 作者:太空宇宙 更新时间:2023-11-04 03:37:13 27 4
gpt4 key购买 nike

线程未按我预期的方式工作。

我有一个可行的解决方案,我可以使用 Raspberry Pi 和簧片开关监控冰箱何时打开和关闭(声音的播放未暂停和暂停)。我现在想添加一个计时器,以便在门打开时间过长时执行某些操作。我想启动一个线程,它会在警报操作之前休眠 x 秒是个好主意。当开关再次关闭时,我会用信号杀死线程。

我的方法失败了。 CountDown 运行线程已启动,但终止信号命令已执行但没有效果。此外,不会执行 c.terminate() 之后的命令。我查看了线程示例,但它们似乎适用于更复杂的情况。我错过了什么?

代码:

#!/usr/bin/env python2.7

import threading, subprocess, sys, time, syslog
import RPi.GPIO as GPIO

sound = "/home/pi/sounds/fridge_link.mp3" # sound to play while switch is open
cmd = ['mplayer', '-nolirc', '-noconsolecontrols', '-slave', '-quiet', sound] # command to play sound
lim = 10 # seconds until warning

# thread for countdown (should be interruptable)
# based on http://chimera.labs.oreilly.com/books/1230000000393/ch12.html#_solution_197
class CountdownTask:
def __init__(self):
self._running = True

def terminate(self):
self._running = False
print("thread killed")

def run(self, n):
print("start timer")
time.sleep(n)
## action when timer isup
print("timer ended")


c = CountdownTask()
t = threading.Thread(target=c.run, args=(lim,))
t.daemon = True

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
p.stdin.write('\npausing_keep pause\n')

REED = 27 # data pin of reed sensor (in)

# GPIO setup
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(REED,GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

def edge(channel):
if GPIO.input(REED):
print("detect close")
c.terminate()
p.stdin.write('\npause\n')
pass
else:
print("detect open")
t.start()
p.stdin.write('\npausing_toggle pause\n')

def main():
GPIO.add_event_detect(REED, GPIO.BOTH,callback=edge,bouncetime=1000)
while True:
time.sleep(0.2)
pass

#------------------------------------------------------------

if __name__ == "__main__": main()

新版本:

#!/usr/bin/env python2.7

import threading, subprocess, sys, time, syslog
import RPi.GPIO as GPIO

sound = "/home/pi/sounds/fridge_link.mp3" # sound to play while switch is open
cmd = ['mplayer', '-nolirc', '-noconsolecontrols', '-slave', '-quiet', sound] # command to play sound
lim = 10 # seconds until warning

# thread for countdown (should be interruptable)
class CountdownTask:
global dooropen
def __init__(self):
self._running = True

def terminate(self):
self._running = False
print("thread killed")

def run(self, n):
while self._running and dooropen == False:
time.sleep(0.2)
pass
while self._running and dooropen:
print("start timer")
time.sleep(n)
## action when timer isup
print("timer ended")


c = CountdownTask()
t = threading.Thread(target=c.run, args=(lim,))
t.daemon = True

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
p.stdin.write('\npausing_keep pause\n')

REED = 27 # data pin of reed sensor (in)

# GPIO setup
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
GPIO.setup(REED,GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

dooropen = False # assuming door's closed when starting


def edge(channel):
global dooropen
if GPIO.input(REED): # * no longer reached
if dooropen == False: # catch fridge compressor spike
print("false close alert")
return
p.stdin.write('\npause\n')
dooropen = False
pass
else:
print("detect open")
if dooropen == True:
print("false open alert")
return
p.stdin.write('\npausing_toggle pause\n')
dooropen = True

def main():
GPIO.add_event_detect(REED, GPIO.BOTH,callback=edge,bouncetime=1000)
t.start()
while True:
time.sleep(0.2)
pass

#------------------------------------------------------------

if __name__ == "__main__": main()

调整后的部分,现在工作:

    def run(self, n):
while self._running and dooropen == False:
time.sleep(0.2)
pass
while self._running and dooropen:
time.sleep(n)
if dooropen:
## action when timer isup

最佳答案

您通过 self._running 编写的线程终止机制不起作用,因为您没有在 run( ) 胎面的方法(实际上是在您所指的示例中完成的)。

定期轮询增加了这里不需要的复杂性。您应该以一种简单可靠的不同方式构建您的逻辑。示例代码:

import threading
import time


dooropen = True


def warnafter(timeout):
time.sleep(timeout)
if dooropen:
print("Warning!")


t = threading.Thread(target=warnafter, args=(2,))
t.start()
time.sleep(1)
dooropen = False
t.join()

time.sleep(1) 更改为 time.sleep(3) 并打印警告。为什么这样做有效?这如何转化为您的用例?

首先,让我们给事物命名。你有你的主线程和“警告线程”。这些是我的示例代码中架构的基石:

  • 在两个线程之间共享一个状态,指示门是否打开,转化为是否应发出警告的事实。我们称此状态为 dooropen,它可以是 TrueFalse。它是一个可在主线程范围和警告线程可访问范围内访问的变量。也就是说,它存在于共享内存中。

  • 这是您的约定:dooropen 仅从主线程编写。警告线程只读取它。

  • 只要您认为合适的时间,就生成您的警告线程。使其休眠(确切的休眠时间可能不可靠,尤其是在嵌入式系统上)。

  • 关键部分:在 warn 线程发出警报之前,让它检查 dooropen 状态。如果没有开门,就不要发出警报!

您看到这两种不同的范例了吗?

您的范例是放置一个武装炸弹,该炸弹被编程为在给定的时间后爆炸。这个炸弹不会再和你顶嘴了。你希望你能够在炸弹爆炸前拆除/摧毁炸弹,如果你不再需要它爆炸的话。

我提议的范例运送的炸弹实际上在需要时才配备。在你的炸弹即将爆炸的时间点,这枚炸弹询问是否真的应该这样做,然后才自行武装并爆炸。

鉴于后一种范例,如果警告线程被告知不要执行其操作,它会静默自行退出。不需要“从外部终止线程”的概念!

在实践中,您需要更高级的概念,其中警告线程有它自己的 active 开关。也就是说,您的主线程可以以受控方式停用单个警告线程。看这个例子:

import threading
import time


class WarnThread(threading.Thread):
def __init__(self, timeout, name):
threading.Thread.__init__(self)
self._timeout = timeout
self.active = True
self.name = name
self.start()

def run(self):
self._warnafter()

def _warnafter(self):
time.sleep(self._timeout)
if self.active:
print("WarnThread %s: Warning after timeout" % self.name)


ws = [WarnThread(2, i) for i in range(5)]

# Simulate spending some time doing other things,
# such as responding to external events.
time.sleep(1)

# Selectively deactivate some of the warn threads.
ws[0].active = False
ws[2].active = False

for w in ws:
w.join()

输出:

WarnThread 4: Warning after timeout
WarnThread 1: Warning after timeout
WarnThread 3: Warning after timeout

关于python - 如何正确使用倒计时线程,如何过早停止它?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28459996/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com