gpt4 book ai didi

Python动态多处理和信号问题

转载 作者:太空狗 更新时间:2023-10-29 21:42:01 25 4
gpt4 key购买 nike

我有一个带有自定义信号处理的 python multiprocessing 设置(即 worker 进程),这会阻止 worker 干净地使用 multiprocessing 本身。 (请参阅下面的扩展问题描述)

设置

生成所有工作进程的 ma​​ster 类如下所示(一些部分被剥离以仅包含重要部分)。

在这里,它重新绑定(bind)了自己的signal,只是为了打印Master teardown;实际上,接收到的信号沿着进程树传播,必须由工作人员自己处理。这是通过 worker 生成后重新绑定(bind)信号来实现的。

class Midlayer(object):
def __init__(self, nprocs=2):
self.nprocs = nprocs
self.procs = []

def handle_signal(self, signum, frame):
log.info('Master teardown')
for p in self.procs:
p.join()
sys.exit()

def start(self):
# Start desired number of workers
for _ in range(nprocs):
p = Worker()
self.procs.append(p)
p.start()

# Bind signals for master AFTER workers have been spawned and started
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)

# Serve forever, only exit on signals
for p in self.procs:
p.join()

worker 类基于 multiprocessing.Process 并实现了它自己的 run() 方法。

在此方法中,它连接到分布式消息队列并轮询队列以永远 查找项目。 Forever 应该是:直到工作人员收到 SIGINTSIGTERM。 worker 不应立即辞职;相反,它必须完成它所做的任何计算并随后退出(一旦 quit_req 设置为 True)。

class Worker(Process):
def __init__(self):
self.quit_req = False
Process.__init__(self)

def handle_signal(self, signum, frame):
print('Stopping worker (pid: {})'.format(self.pid))
self.quit_req = True

def run(self):
# Set signals for worker process
signal.signal(signal.SIGINT, self.handle_signal)
signal.signal(signal.SIGTERM, self.handle_signal)

q = connect_to_some_distributed_message_queue()

# Start consuming
print('Starting worker (pid: {})'.format(self.pid))
while not self.quit_req:
message = q.poll()
if len(message):
try:
print('{} handling message "{}"'.format(
self.pid, message)
)
# Facade pattern: Pick the correct target function for the
# requested message and execute it.
MessageRouter.route(message)
except Exception as e:
print('{} failed handling "{}": {}'.format(
self.pid, message, e.message)
)

问题

到目前为止,对于基本设置,(几乎)一切正常:

  • 主进程产生所需数量的 worker
  • 每个worker连接到消息队列
  • 消息发布后,其中一名工作人员会收到消息
  • 外观模式(使用名为 MessageRouter 的类)将收到的消息路由到相应的函数并执行它

现在问题来了:目标函数(messageMessageRouter facade 指向)可能包含非常复杂的业务逻辑,因此可能需要多处理.

例如,如果目标函数包含如下内容:

nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)

然后由 Pool 产生的进程也将它们对 SIGINTSIGTERM 的信号处理重定向到 worker 的 handle_signal 函数(因为信号传播到进程子树),本质上是打印 Stopping worker (pid: ...) 而根本没有停止。我知道,发生这种情况是因为我在工作人员生成其自己的子进程之前重新绑定(bind)了信号。

这就是我卡住的地方:我无法在 生成其子进程后设置工作人员的信号,因为我不知道它是否生成了一些(目标函数被屏蔽并且可能由其他人编写),并且因为工作人员(按设计)停留在其轮询循环中。同时,我不能指望使用 multiprocessing 将其自己的信号处理程序重新绑定(bind)到(无论)默认值的目标函数的实现。

目前,我觉得在 worker 的每个循环中恢复信号处理程序(在消息被路由到它的目标函数之前)并在函数返回后重置它们是唯一的选择,但它只是感觉不对。

我错过了什么吗?你有什么建议吗?如果有人能给我提示如何解决我的设计缺陷,我将非常高兴!

最佳答案

没有明确的方法可以按照您希望的方式解决问题。我经常发现自己必须在多处理环境中运行未知代码(表示为 Python 入口点函数,可能会陷入一些 C 怪异)。

这就是我处理问题的方式。

主循环

通常主循环非常简单,它从某个来源(HTTP、管道、Rabbit 队列等)获取任务并将其提交给工作池。我确保正确处理 KeyboardInterrupt 异常以关闭服务。

try:
while 1:
task = get_next_task()
service.process(task)
except KeyboardInterrupt:
service.wait_for_pending_tasks()
logging.info("Sayonara!")

worker

工作人员由来自 multiprocessing.Pool 或来自 concurrent.futures.ProcessPoolExecutor 的工作人员池管理。如果我需要更高级的功能,例如超时支持,我要么使用 billiardpebble .

每个工作人员都会按照建议忽略 SIGINT here . SIGTERM 保留为默认值。

服务

该服务由 systemd 或 supervisord 控制.无论哪种情况,我都会确保终止请求始终作为 SIGINT (CTL+C) 传递。

我想将 SIGTERM 保留为紧急关闭,而不是仅依赖 SIGKILL。 SIGKILL 不可移植,某些平台未实现它。

“我希望它这么简单”

如果事情更复杂,我会考虑使用框架,例如 LuigiCelery .

一般来说,在这些事情上重新发明轮子是非常有害的,并且不会带来什么满足感。特别是如果其他人必须查看该代码。

当然,如果您的目标是了解如何完成这些事情,则后一句话不适用。

关于Python动态多处理和信号问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40672264/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com