gpt4 book ai didi

Python sys.excepthook 仅适用于主进程但不适用于子进程

转载 作者:太空宇宙 更新时间:2023-11-04 05:36:13 28 4
gpt4 key购买 nike

我有一个应用程序正在运行一些子进程,我已经成功地为主进程设置了 sys.excepthook 异常处理。现在,我想为子进程上的同一个钩子(Hook)设置它。我希望它能像复制我在主进程中使用的确切代码行一样简单,仅此而已,但它没有用。

接下来是我的代码:

    class Consumer(multiprocessing.Process):

def __init__(self, codec_status_queue, logger_queue):
multiprocessing.Process.__init__(self)
self.codec_status_queue = codec_status_queue
self.logger_queue = logger_queue

def run(self):
# Set default unhandled exceptions handler
uncaughtErrorHandler = UncaughtErrorHandler(self.logger_queue)
sys.excepthook = uncaughtErrorHandler.error_handler

1/0


class UncaughtErrorHandler(object):

def __init__(self, logger_queue, child_processes=None):
self.logger_queue = logger_queue
self.child_processes = child_processes

def error_handler(self, type, value, trace_back):
trace_formatted = "".join(traceback.format_tb(trace_back))
exeption_message = "Unhandled Exception:\n Type: %s\n Value: %s\n Line: %s\n Traceback:\n %s" % (type, value.message, trace_back.tb_lineno, trace_formatted)
logger_queue.put((LoggerThread.CRITICAL, exeption_message))

if self.child_processes:
self.stop_children()

# Stopping this process
sys.exit()

def stop_children(self):
num_children = len(self.child_processes)
logger_queue.put((LoggerThread.DEBUG, "Terminating child processes (%s)" % num_children))
for process in self.child_processes:
log_message = "Terminating %s with PID %s" % (process.name, process.pid)
logger_queue.put((LoggerThread.DEBUG, log_message))
process.terminate()


if __name__ == '__main__':
...
# Create processes and communication queues
codec_status_queue = multiprocessing.Queue()

num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(codec_status_queue, logger_queue)
for i in xrange(num_consumers) ]

# Set default unhandled exceptions handler
uncaughtErrorHandler = UncaughtErrorHandler(logger_queue, consumers)
sys.excepthook = uncaughtErrorHandler.error_handler

# Start processes
for consumer in consumers:
consumer.daemon = True
consumer.start()

如果我将 1/0 放在 __main__ 部分,UncaughtErrorHandler 会捕获异常,但是当 1/0 被放置时,如图所示以上,它没有。

也许有人可以告诉我我做错了什么?

最佳答案

以下代码是为 Python 3.x 编写的,但可以改为与 Python 3.x 一起使用。它提供了一种替代解决方案来覆盖子进程中的 sys.excepthook。一个简单的修复涉及捕获所有异常并将数据从 sys.exc_info 移交给异常处理程序。主进程可以对异常使用类似的模式,但保留程序的原始设计。下面显示的示例应该是一个完整的工作演示,您可以试用并适应您的需求。

#! /usr/bin/env python3
import logging
import multiprocessing
import queue
import sys
import threading
import time
import traceback


def main():
"""Demonstrate exception handling and logging in several processes."""
logger_queue = multiprocessing.Queue()
logger_thread = LoggerThread(logger_queue)
logger_thread.start()
try:
# Create processes and communication queues
codec_status_queue = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [Consumer(codec_status_queue, logger_queue)
for _ in range(num_consumers)]
# Set default unhandled exceptions handler
uncaught_error_handler = UncaughtErrorHandler(logger_queue, consumers)
sys.excepthook = uncaught_error_handler.error_handler
# Start processes
for consumer in consumers:
consumer.start()
time.sleep(2)
finally:
logger_thread.shutdown()


def get_message(value):
"""Retrieve an exception's error message and return it."""
if hasattr(value, 'message'):
return value.message
if hasattr(value, 'args') and value.args:
return value.args[0]


class LoggerThread(threading.Thread):

"""Handle logging messages coming from various sources via a queue."""

CRITICAL = logging.CRITICAL
DEBUG = logging.DEBUG

def __init__(self, logger_queue):
"""Initialize an instance of the LoggerThread class."""
super().__init__()
self.logger_queue = logger_queue
self.mutex = threading.Lock()
self.running = False

def run(self):
"""Process messages coming through the queue until shutdown."""
self.running = True
while self.running:
try:
while True:
self.handle_message(*self.logger_queue.get(True, 0.1))
except queue.Empty:
pass

def handle_message(self, level, message):
"""Show the message while ensuring a guaranteed order on screen."""
with self.mutex:
print('Level:', level)
print('Message:', message)
print('=' * 80, flush=True)

def shutdown(self):
"""Signal the thread to exit once it runs out of messages."""
self.running = False


class Consumer(multiprocessing.Process):

"""Simulate a consumer process that handles data from a queue."""

def __init__(self, codec_status_queue, logger_queue):
"""Initialize an instance of the Consumer class."""
super().__init__()
self.codec_status_queue = codec_status_queue
self.logger_queue = logger_queue
self.daemon = True

def run(self):
"""Begin working as a consumer while handling any exceptions."""
# Set default unhandled exceptions handler
uncaught_error_handler = UncaughtErrorHandler(self.logger_queue)
try:
self.do_consumer_work()
except:
uncaught_error_handler.error_handler(*sys.exc_info())

def do_consumer_work(self):
"""Pretend to be doing the work of a consumer."""
junk = 1 / 0
print('Process', self.ident, 'calculated', junk)


class UncaughtErrorHandler:

"""Organize error handling to automatically terminate child processes."""

def __init__(self, logger_queue, child_processes=None):
"""Initialize an instance of the UncaughtErrorHandler class."""
self.logger_queue = logger_queue
self.child_processes = child_processes

def error_handler(self, kind, value, trace_back):
"""Record errors as they happen and terminate the process tree."""
trace_formatted = ''.join(traceback.format_tb(trace_back))
exception_message = ('Unhandled Exception:\n'
' Type: {}\n'
' Value: {}\n'
' Line: {}\n'
' Traceback:\n{}').format(
kind, get_message(value), trace_back.tb_lineno, trace_formatted)
self.logger_queue.put((LoggerThread.CRITICAL, exception_message))
if self.child_processes:
self.stop_children()
# Stopping this process
sys.exit()

def stop_children(self):
"""Terminate all children associated with this error handler."""
num_children = len(self.child_processes)
log_message = 'Terminating child processes({})'.format(num_children)
self.logger_queue.put((LoggerThread.DEBUG, log_message))
for process in self.child_processes:
log_message = 'Terminating {} with PID {}'.format(
process.name, process.pid)
self.logger_queue.put((LoggerThread.DEBUG, log_message))
process.terminate()


if __name__ == '__main__':
main()

关于Python sys.excepthook 仅适用于主进程但不适用于子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35508371/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com