gpt4 book ai didi

具有多线程的Python生产者/消费者

转载 作者:行者123 更新时间:2023-11-30 23:12:58 29 4
gpt4 key购买 nike

我编写这个小应用程序是为了解决 Python 中的经典生产者/消费者问题。我知道我可以使用线程安全的队列机制来解决这个问题,但我有兴趣自己解决这个问题来学习。

from threading import Thread, Condition
from collections import deque
import random
import time

tasks = deque()
condition = Condition()

class Consumer(Thread):

def process_task(self, task):
self.log("Completed task " + str(task) )

def get_task(self):
# this is the only method that access the global object
global tasks
condition.acquire()
if len(tasks) is 0:
self.log("Nothing in the Queue... Sleeping!")
condition.wait()
return None
task = tasks.popleft()
condition.release();
return task

def sleep(self):
self.log("Sleeping")
time.sleep(1)

def execute(self):
while True:
task = self.get_task()
if task:
self.process_task(task)

def run(self):
self.log("Started")
self.execute()

def log(self, msg):
print("[ Consumer: {0} ] Consumed {1}".format(self.getName(), msg))

class Producer(Thread):

def create_tasks(self):
return [random.randint(0,100) for x in range(100)]

def add_tasks(self, new_tasks):
global tasks
condition.acquire()
tasks.extend(new_tasks)
self.log("Produced " + str(len(new_tasks)) + "elements")
self.log("Queue length : " + str(len(tasks)))
self.notify_all()
condition.release();

def notify_all(self):
condition.notify_all()
self.log("Aweking consumers")

def sleep(self):
self.log("Sleeping")
time.sleep(1)

def execute(self):
while True:
new_tasks = self.create_tasks()
self.add_tasks(new_tasks)
self.sleep()

def run(self):
self.log("Started")
self.execute()

def log(self, msg):
print("[ Producer: {0} ] {1}".format(self.getName(), msg))


Producer().start()
Producer().start()

Consumer().start()
Consumer().start()
Consumer().start()

该应用程序似乎运行正常。我的意思是我没有遇到任何死锁或奇怪的异常。然而,我得到的结果并不是我所期望的,因为工作根本没有分布在不同的消费者之间。我原以为所有三个消费者线程都会并行完成一些工作,但这并没有发生。这是日志的摘录:

[ Consumer: Thread-4 ] Consumed Completed task 42
... 98 times the above line with a different task ( always thread 4) ...
[ Consumer: Thread-4 ] Consumed Completed task 22
[ Consumer: Thread-4 ] Consumed Nothing in the Queue... Sleeping!
[ Consumer: Thread-5 ] Consumed Nothing in the Queue... Sleeping!
[ Consumer: Thread-3 ] Consumed Nothing in the Queue... Sleeping!
[ Producer: Thread-1 ] Produced 100elements
[ Producer: Thread-1 ] Queue length : 100
[ Producer: Thread-1 ] Aweking consumers
[ Producer: Thread-1 ] Sleeping
[ Consumer: Thread-3 ] Consumed Completed task 87
... 98 times the above line with a different task ( always thread 3)...
[ Consumer: Thread-3 ] Consumed Completed task 20
[ Consumer: Thread-3 ] Consumed Nothing in the Queue... Sleeping!
[ Consumer: Thread-4 ] Consumed Nothing in the Queue... Sleeping!
[ Consumer: Thread-5 ] Consumed Nothing in the Queue... Sleeping!

从上面的日志中可以看出,所有工作均由一个线程执行,而另外 2 个线程不执行任何操作。看来他们根本就没有机会逃跑。我尝试在执行任务后让消费者线程进入休眠状态,但行为没有改变。

我的代码中是否存在我没​​有发现的错误?这个行为可能与著名的“GIL”有关吗?

感谢您的帮助。

最佳答案

您所看到的并不是真正的编码问题,只是同一个线程通常赢得了重新获取条件互斥锁的竞争。线程有足够的时间运行 process_task 并且仍然赢得比赛,因为 process_task 几乎没有做任何事情。我认为这实际上与 GIL 无关;我在没有 GIL 的 Jython 中运行代码,并发生了相同的行为。

不过,绝对没有错误。当我在 CPython 中运行您的代码时,偶尔会有多个线程最终消耗:

[ Producer: Thread-1 ] Started
[ Producer: Thread-1 ] Produced 100elements
[ Producer: Thread-1 ] Queue length : 100
[ Producer: Thread-1 ] Aweking consumers
[ Producer: Thread-1 ] Sleeping
[ Producer: Thread-2 ] Started
[ Consumer: Thread-3 ] Consumed Started
[ Producer: Thread-2 ] Produced 100elements
[ Producer: Thread-2 ] Queue length : 200
[ Consumer: Thread-5 ] Consumed Started
[ Producer: Thread-2 ] Aweking consumers
[ Consumer: Thread-4 ] Consumed Started
[ Producer: Thread-2 ] Sleeping
[ Consumer: Thread-5 ] Consumed Completed task 53
[ Consumer: Thread-4 ] Consumed Completed task 73
[ Consumer: Thread-5 ] Consumed Completed task 83
[ Consumer: Thread-4 ] Consumed Completed task 71
[ Consumer: Thread-5 ] Consumed Completed task 67
[ Consumer: Thread-4 ] Consumed Completed task 7
[ Consumer: Thread-5 ] Consumed Completed task 34
[ Consumer: Thread-4 ] Consumed Completed task 68
[ Consumer: Thread-5 ] Consumed Completed task 15
[ Consumer: Thread-4 ] Consumed Completed task 29
[ Consumer: Thread-5 ] Consumed Completed task 20
... (4 and 5 continue to switch off)

此外,如果我在 process_task 内部调用 self.sleep() ,我会得到非常正常的结果,它会模拟一些实际发生的实际工作,因此会产生很多影响。更真实的测试:

[ Producer: Thread-1 ] Started
[ Producer: Thread-2 ] Started
[ Producer: Thread-1 ] Produced 100elements
[ Producer: Thread-1 ] Queue length : 100
[ Producer: Thread-1 ] Aweking consumers
[ Consumer: Thread-5 ] Consumed Started
[ Consumer: Thread-3 ] Consumed Started
[ Producer: Thread-1 ] Sleeping
[ Producer: Thread-2 ] Produced 100elements
[ Consumer: Thread-4 ] Consumed Started
[ Producer: Thread-2 ] Queue length : 200
[ Producer: Thread-2 ] Aweking consumers
[ Producer: Thread-2 ] Sleeping
[ Consumer: Thread-3 ] Consumed Sleeping
[ Consumer: Thread-5 ] Consumed Sleeping
[ Consumer: Thread-4 ] Consumed Sleeping
[ Producer: Thread-1 ] Produced 100elements
[ Consumer: Thread-3 ] Consumed Completed task 85
[ Consumer: Thread-5 ] Consumed Completed task 31
[ Producer: Thread-1 ] Queue length : 297
[ Consumer: Thread-4 ] Consumed Completed task 62
[ Producer: Thread-1 ] Aweking consumers
[ Producer: Thread-1 ] Sleeping
[ Producer: Thread-2 ] Produced 100elements
[ Producer: Thread-2 ] Queue length : 397
[ Producer: Thread-2 ] Aweking consumers
[ Producer: Thread-2 ] Sleeping
[ Consumer: Thread-3 ] Consumed Sleeping
[ Consumer: Thread-5 ] Consumed Sleeping
[ Consumer: Thread-4 ] Consumed Sleeping
[ Producer: Thread-1 ] Produced 100elements
[ Producer: Thread-1 ] Queue length : 494
[ Consumer: Thread-3 ] Consumed Completed task 99
[ Consumer: Thread-4 ] Consumed Completed task 58
[ Producer: Thread-1 ] Aweking consumers
[ Consumer: Thread-5 ] Consumed Completed task 18

关于具有多线程的Python生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29579447/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com