gpt4 book ai didi

python - 防止多进程 Python 应用程序中的线程重复

转载 作者:行者123 更新时间:2023-12-01 05:20:25 25 4
gpt4 key购买 nike

我有一个 Python 应用程序,它在由 multiprocessing.Process 启动的子进程中运行多个作业。父应用程序还启动一个线程来向数据库报告进度。但是,我注意到,如果任何作业启动自己的子进程,它们就会复制该线程,从而导致数据库中的数据损坏。例如当孙子进程完成时,其线程在数据库中将父作业标记为已完成,因为它认为自己是父进程,即使父进程仍在运行。

我将如何使用multiprocess.Process,以便它不会复制任何当前正在运行的线程?是在我的线程中记录原始 PID 的最简单选项,如果“当前”PID 与该值不匹配,则立即退出?

我看到了这个similar question去年发布的,但似乎被忽略了。

最佳答案

您对问题的描述表明父进程中的后台线程继续存在并在子进程中执行。那是不可能的;至少,在 POSIX 系统上这是不可能的。你的情况发生的是另一回事。我将在下面对此进行一些猜测,然后提出如何避免该问题的建议。依次考虑这些点...

<强>1。只有一个线程能够在 fork 中幸存下来。

只有调用fork()的线程在 fork 后仍然存活。下面是一个小示例,演示其他线程不会在子进程中继续执行:

def output():
time.sleep(3)
print "Thread executing in process: %d" % os.getpid()

thread = threading.Thread(target=output)
thread.start()
os.fork()
print "Pid: %d" % os.getpid()

您将看到父线程和子线程都将其 pid 打印到标准输出,但第二个线程将仅在父线程中生成输出。

因此,让监视器线程检查其 pid 或以其他方式检查其正在运行的进程不会产生任何影响;该线程仅在一个进程中执行。

<强>2。 fork 在某些方面可能会导致像您所看到的问题。

fork 可能会以多种方式导致程序状态损坏。例如:

  • 因 fork 而终止的线程中引用的对象可能会超出范围,从而调用其终结器。例如,如果这样的对象表示一种网络资源,并且调用其 del 方法会导致连接的一端意外关闭,则这可能会导致问题。
  • 任何缓冲 IO 都会导致问题,因为缓冲区在子进程中重复。

请注意,第二点甚至不需要线程。考虑以下因素:

f = open("testfile", "w", 1024)
f.write("a")
os.fork()

我们向 testfile 写入了一个字符,并且在 fork 之前在父级中执行了此操作。但我们在该内容尚未刷新时进行了 fork ,因此:

alp:~ $ wc -c testfile
2 testfile

该文件包含两个字符,因为输出缓冲区已复制到子级,并且父级和子级最终都刷新了它们的缓冲区。

我怀疑您的问题是由第二个问题引起的(尽管我很高兴地承认这纯粹是猜测)。

<强>3。重新架构以避免此类问题。

您在评论中提到,在生成工作人员后无法启动监视器线程,因为您需要重复创建新工作人员。重组你正在做的事情以避免这种情况可能比你想象的更容易。不要为每个新的工作单元生成一个进程,而是创建一组由控制进程管理的长期工作进程: Controller 向队列提供需要处理的作业的规范;它在闲暇时这样做。每个工作线程无限循环,当作业到达时从队列中提取作业并执行它们。 (多处理的队列实现将保证每个作业描述仅由一个工作人员绘制。)因此,您只需要尽早生成工作人员一次,并且可以在所有 fork 完成后创建监视器线程.

以下是此类组织的示意性示例:

from multiprocessing import Process, Queue

def work(q):
while True:
job = q.get()
if job is None:
# We've been signaled to stop.
break
do_something_with(job)

queue = Queue()
NUM_WORKERS = 3
NUM_JOBS = 20

# Start workers.
for _ in range(NUM_WORKERS):
p = Process(target=work, args=(queue,))
p.start()

# Create your monitor thread here.

# Put work in the queue. This continues as long as you want.
for i in range(NUM_JOBS):
queue.put(i)

# When there's no more work, put sentinel values in the queue so workers
# know to gracefully exit.
for _ in range(NUM_WORKERS):
queue.put(None)

关于python - 防止多进程 Python 应用程序中的线程重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22510625/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com