gpt4 book ai didi

worker 中的 Python 多处理和处理异常

转载 作者:太空狗 更新时间:2023-10-29 18:04:45 25 4
gpt4 key购买 nike

我将 python 多处理库用于一种算法,其中我有许多工作人员处理某些数据并将结果返回给父进程。我使用 multiprocessing.Queue 将作业传递给工作人员,然后收集结果。

一切都很好,直到 worker 无法处理某些数据 block 。在下面的简化示例中,每个工作人员都有两个阶段:

  • 初始化 - 可能会失败,在这种情况下 worker 应该被销毁
  • 数据处理——处理一 block 数据可能会失败,在这种情况下,worker 应该跳过这个 block 并继续处理下一个数据。

当这两个阶段中的任何一个失败时,我都会在脚本完成后陷入僵局。此代码模拟了我的问题:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)

def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args

def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
finally:
jobs_queue.task_done()
# Telling that we are done with processing stop token
jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
result = results.get()
print result

#Signal all workers to finish
for i in range(workers_count):
jobs.put(None)

#Wait for them to finish
jobs.join()

关于这段代码我有两个问题:

  1. init() 失败时,如何检测 worker 无效而不是等待它完成?
  2. do_work() 失败时,如何通知父进程结果队列中应该有更少的结果?

谢谢你的帮助!

最佳答案

我稍微更改了您的代码以使其工作(请参阅下面的解释)。

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)

def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args

def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
result_queue.put('init failed')
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
result = results.get()
init_failures += int(result == 'init failed')
job_failures += int(result == 'job failed')
successes += int(result != 'init failed' and result != 'job failed')
#print init_failures, job_failures, successes

for ii in range(workers_count):
jobs.put(None)

我的改变:

  1. jobs 更改为普通的 Queue(而不是 JoinableQueue)。
  2. 工作人员现在反馈特殊结果字符串“init failed”和“job failed”。
  3. 只要特定条件有效,主进程就会监视上述特殊结果。
  4. 最后,无论您有多少 worker ,都提出“停止”请求(即 工作)。请注意,并非所有这些都可以从队列中拉出(以防工作人员初始化失败)。

顺便说一句,您的原始代码非常好用且易于使用。随机概率位非常酷。

关于 worker 中的 Python 多处理和处理异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16943404/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com