gpt4 book ai didi

python - python多处理的生产者/消费者问题

转载 作者:太空狗 更新时间:2023-10-29 17:18:04 24 4
gpt4 key购买 nike

我正在编写一个有一个生产者和多个消费者的服务器程序,让我感到困惑的是只有第一个放入队列的任务生产者得到消耗,之后排队的任务不再被消耗,它们仍然存在永远在队列中。

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)

class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()

def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()

httpserv(self.queue)

def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
queue.close()

Manager().start()

生产者是一个 HTTP 服务器,它在收到任务后将任务放入队列中来自用户的请求。消费流程貌似还是当队列中有新任务时阻塞,这很奇怪。

附言另外两个与上述无关的问题,我不确定是否最好将 HTTP 服务器放在主进程以外的自己的进程中进程,如果是,我怎样才能让主进程在所有之前保持运行子进程结束。第二个问题,什么是停止的最好方法优雅的 HTTP 服务器?

编辑:添加生产者代码,它只是一个简单的python wsgi服务器:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
evwsgi.start("0.0.0.0", 8080)
evwsgi.set_base_module(base)

def request_1(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_1')
return ["request 1!"]

def request_2(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_2')
return ["request 2!!"]

evwsgi.wsgi_cb(("/request_1", request_1))
evwsgi.wsgi_cb(("/request_2", request_2))

evwsgi.run()

最佳答案

我认为 Web 服务器部分一定有问题,因为它工作得很好:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
works = ["task_1", "task_2"]
while True:
time.sleep(0.01)
queue.put(random.choice(works))


def work(id, queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(0.05)
print "%d task:" % id, task
queue.put(None)


class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()

def start(self):
print "starting %d workers" % self.NUMBER_OF_PROCESSES
self.workers = [Process(target=work, args=(i, self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()

serve(self.queue)

def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
self.queue.close()


Manager().start()

示例输出:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1

关于python - python多处理的生产者/消费者问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/914821/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com