gpt4 book ai didi

突然 sys.exit() 上的 Python 守护进程线程清理逻辑

转载 作者:太空狗 更新时间:2023-10-29 11:45:09 25 4
gpt4 key购买 nike

使用 Linux 和 Python 2.7.6,我有一个脚本可以一次上传大量文件。我在队列和线程模块中使用多线程。

我为 SIGINT 实现了一个处理程序,以便在用户按下 ctrl-C 时停止脚本。我更喜欢使用守护线程,这样我就不必清除队列,这需要大量重写代码才能使 SIGINT 处理程序能够访问 Queue 对象,因为处理程序不接受参数。

为了确保守护线程在 sys.exit() 之前完成并清理,我使用 threading.Event() 和 threading.clear() 让线程等待。这段代码似乎可以作为 print threading.enumerate() 在我进行调试时脚本终止之前仅显示主线程。只是为了确定,我想知道是否对这个清理实现有任何我可能会遗漏的见解,即使它似乎对我有用:

def signal_handler(signal, frame):
global kill_received
kill_received = True
msg = (
"\n\nYou pressed Ctrl+C!"
"\nYour logs and their locations are:"
"\n{}\n{}\n{}\n\n".format(debug, error, info))
logger.info(msg)
threads = threading.Event()
threads.clear()

while True:
time.sleep(3)
threads_remaining = len(threading.enumerate())
print threads_remaining
if threads_remaining == 1:
sys.exit()

def do_the_uploads(file_list, file_quantity,
retry_list, authenticate):
"""The uploading engine"""
value = raw_input(
"\nPlease enter how many concurent "
"uploads you want at one time(example: 200)> ")
value = int(value)
logger.info('{} concurent uploads will be used.'.format(value))

confirm = raw_input(
"\nProceed to upload files? Enter [Y/y] for yes: ").upper()
if confirm == "Y":
kill_received = False
sys.stdout.write("\x1b[2J\x1b[H")
q = CustomQueue()

def worker():
global kill_received
while not kill_received:
item = q.get()
upload_file(item, file_quantity, retry_list, authenticate, q)
q.task_done()

for i in range(value):
t = Thread(target=worker)
t.setDaemon(True)
t.start()

for item in file_list:
q.put(item)

q.join()

print "Finished. Cleaning up processes...",
#Allowing the threads to cleanup
time.sleep(4)



def upload_file(file_obj, file_quantity, retry_list, authenticate, q):
"""Uploads a file. One file per it's own thread. No batch style. This way if one upload
fails no others are effected."""
absolute_path_filename, filename, dir_name, token, url = file_obj
url = url + dir_name + '/' + filename
try:
with open(absolute_path_filename) as f:
r = requests.put(url, data=f, headers=header_collection, timeout=20)
except requests.exceptions.ConnectionError as e:
pass
if src_md5 == r.headers['etag']:
file_quantity.deduct()

最佳答案

如果你想处理Ctrl+C;在主线程中处理 KeyboardInterrupt 异常就足够了。不要在函数中使用 global X 除非你在其中执行 X = some_value 。使用 time.sleep(4) 允许线程清理是一种代码味道。你不需要它。

I am using threading.Event() and threading.clear() to make threads wait.

此代码对您的线程没有影响:

# create local variable
threads = threading.Event()
# clear internal flag in it (that is returned by .is_set/.wait methods)
threads.clear()

不要从多线程程序中的信号处理程序调用 logger.info()。它可能会使您的程序陷入僵局。信号处理程序只能调用一组有限的函数。安全的选择是在其中设置一个全局标志并退出:

def signal_handler(signal, frame):
global kill_received
kill_received = True
# return (no more code)

信号可能会延迟到 q.join() 返回。即使信号立即发出; q.get() 阻塞你的子线程。它们挂起直到主线程退出。要解决这两个问题,您可以使用 sentinel 向子进程发出没有更多工作的信号,在这种情况下完全放弃信号处理程序:

def worker(stopped, queue, *args):
for item in iter(queue.get, None): # iterate until queue.get() returns None
if not stopped.is_set(): # a simple global flag would also work here
upload_file(item, *args)
else:
break # exit prematurely
# do child specific clean up here

# start threads
q = Queue.Queue()
stopped = threading.Event() # set when threads should exit prematurely
threads = set()
for _ in range(number_of_threads):
t = Thread(target=worker, args=(stopped, q)+other_args)
threads.add(t)
t.daemon = True
t.start()

# provide work
for item in file_list:
q.put(item)
for _ in threads:
q.put(None) # put sentinel to signal the end

while threads: # until there are alive child threads
try:
for t in threads:
t.join(.3) # use a timeout to get KeyboardInterrupt sooner
if not t.is_alive():
threads.remove(t) # remove dead
break
except (KeyboardInterrupt, SystemExit):
print("got Ctrl+C (SIGINT) or exit() is called")
stopped.set() # signal threads to exit gracefully

我已将 value 重命名为 number_of_threads。我使用了显式线程集

如果一个单独的 upload_file() block ,那么程序将不会在 Ctrl-C 上退出。

对于 multiprocessing.Pool 接口(interface),您的案例似乎足够简单:

from multiprocessing.pool import ThreadPool
from functools import partial

def do_uploads(number_of_threads, file_list, **kwargs_for_upload_file):
process_file = partial(upload_file, **kwargs_for_upload_file)
pool = ThreadPool(number_of_threads) # number of concurrent uploads
try:
for _ in pool.imap_unordered(process_file, file_list):
pass # you could report progress here
finally:
pool.close() # no more additional work
pool.join() # wait until current work is done

它应该在 Ctrl-C 上正常退出,即,允许完成正在进行的上传,但不会开始新的上传。

关于突然 sys.exit() 上的 Python 守护进程线程清理逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20318859/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com