gpt4 book ai didi

具有异常处理的 Python 生产者/消费者

转载 作者:太空狗 更新时间:2023-10-30 01:30:10 33 4
gpt4 key购买 nike

我正在尝试编写一个看似简单的经典生产者 - 消费者成语的实现在 Python 中。对于多个较慢的消费者,有一个相对较快的生产者。原则上,这使用 Queue 很容易做到模块,以及库文档有一个示例,仅生成几行代码。

但是,我也希望代码能够在发生异常时正常工作。既是制作人如果发生以下任何情况,所有消费者都应停止:

  • 生产者因异常而失败
  • 任何消费者失败并出现异常
  • 用户停止程序(导致键盘中断)

在那之后,整个过程应该失败引发初始异常以通知来电询问出了什么问题。

主要的挑战似乎是在不结束的情况下干净地终止消费者线程在一个阻塞的 join() 中。设置 Thread.deamon=True 似乎很流行,但对我来说理解这一点会导致资源泄漏,以防生产者因异常而失败。

我设法编写了一个满足我的要求的实现(见下文)。然而我发现代码比预期的要复杂得多。

有没有更精简的方法来处理这些情况?

这里有几个示例调用和我当前的最终日志消息实现:

生产并消费 10 件元素:

$ python procon.py
INFO:root:processed all items

不产生任何元素:

$ python procon.py --items 0
INFO:root:processed all items

为 10 个消费者生产 5 个元素,因此只使用一些可用的消费者:

$ python procon.py --items 5 --consumers 10
INFO:root:processed all items

按 Control-C 中断:

$ python procon.py
^CWARNING:root:interrupted by user

无法生成项目 3:

$ python procon.py --producer-fails-at 3
ERROR:root:cannot produce item 3

第三项消费失败:

$ python procon.py --consumer-fails-at 3
ERROR:root:cannot consume item 3

最后一次消费失败:

$ python procon.py --items 10 --consumer-fails-at 9
ERROR:root:cannot consume item 9

这里是可能过于复杂的源代码:

"""
Consumer/producer to test exception handling in threads. Both the producer
and the consumer can be made to fail deliberately when processing a certain
item using command line options.
"""
import logging
import optparse
import Queue
import threading
import time

_PRODUCTION_DELAY = 0.1
_CONSUMPTION_DELAY = 0.3

# Delay for ugly hacks and polling loops.
_HACK_DELAY = 0.05

class _Consumer(threading.Thread):
"""
Thread to consume items from an item queue filled by a producer, which can
be told to terminate in two ways:

1. using `finish()`, which keeps processing the remaining items on the
queue until it is empty
2. using `cancel()`, which finishes consuming the current item and then
terminates
"""
def __init__(self, name, itemQueue, failedConsumers):
super(_Consumer, self).__init__(name=name)
self._log = logging.getLogger(name)
self._itemQueue = itemQueue
self._failedConsumers = failedConsumers
self.error = None
self.itemToFailAt = None
self._log.info(u"waiting for items to consume")
self._isFinishing = False
self._isCanceled = False

def finish(self):
self._isFinishing = True

def cancel(self):
self._isCanceled = True

def consume(self, item):
self._log.info(u"consume item %d", item)
if item == self.itemToFailAt:
raise ValueError("cannot consume item %d" % item)
time.sleep(_CONSUMPTION_DELAY)

def run(self):
try:
while not (self._isFinishing and self._itemQueue.empty()) \
and not self._isCanceled:
# HACK: Use a timeout when getting the item from the queue
# because between `empty()` and `get()` another consumer might
# have removed it.
try:
item = self._itemQueue.get(timeout=_HACK_DELAY)
self.consume(item)
except Queue.Empty:
pass
if self._isCanceled:
self._log.info(u"canceled")
if self._isFinishing:
self._log.info(u"finished")
except Exception, error:
self._log.error(u"cannot continue to consume: %s", error)
self.error = error
self._failedConsumers.put(self)


class Worker(object):
"""
Controller for interaction between producer and consumers.
"""
def __init__(self, itemsToProduceCount, itemProducerFailsAt,
itemConsumerFailsAt, consumerCount):
self._itemsToProduceCount = itemsToProduceCount
self._itemProducerFailsAt = itemProducerFailsAt
self._itemConsumerFailsAt = itemConsumerFailsAt
self._consumerCount = consumerCount
self._itemQueue = Queue.Queue()
self._failedConsumers = Queue.Queue()
self._log = logging.getLogger("producer")
self._consumers = []

def _possiblyRaiseConsumerError(self):
if not self._failedConsumers.empty():
failedConsumer = self._failedConsumers.get()
self._log.info(u"handling failed %s", failedConsumer.name)
raise failedConsumer.error

def _cancelAllConsumers(self):
self._log.info(u"canceling all consumers")
for consumerToCancel in self._consumers:
consumerToCancel.cancel()
self._log.info(u"waiting for consumers to be canceled")
for possiblyCanceledConsumer in self._consumers:
# In this case, we ignore possible consumer errors because there
# already is an error to report.
possiblyCanceledConsumer.join(_HACK_DELAY)
if possiblyCanceledConsumer.isAlive():
self._consumers.append(possiblyCanceledConsumer)

def work(self):
"""
Launch consumer thread and produce items. In case any consumer or the
producer raise an exception, fail by raising this exception
"""
self.consumers = []
for consumerId in range(self._consumerCount):
consumerToStart = _Consumer(u"consumer %d" % consumerId,
self._itemQueue, self._failedConsumers)
self._consumers.append(consumerToStart)
consumerToStart.start()
if self._itemConsumerFailsAt is not None:
consumerToStart.itemToFailAt = self._itemConsumerFailsAt

self._log = logging.getLogger("producer ")
self._log.info(u"producing %d items", self._itemsToProduceCount)

for itemNumber in range(self._itemsToProduceCount):
self._possiblyRaiseConsumerError()
self._log.info(u"produce item %d", itemNumber)
if itemNumber == self._itemProducerFailsAt:
raise ValueError("ucannot produce item %d" % itemNumber)
# Do the actual work.
time.sleep(_PRODUCTION_DELAY)
self._itemQueue.put(itemNumber)

self._log.info(u"telling consumers to finish the remaining items")
for consumerToFinish in self._consumers:
consumerToFinish.finish()
self._log.info(u"waiting for consumers to finish")
for possiblyFinishedConsumer in self._consumers:
self._possiblyRaiseConsumerError()
possiblyFinishedConsumer.join(_HACK_DELAY)
if possiblyFinishedConsumer.isAlive():
self._consumers.append(possiblyFinishedConsumer)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = optparse.OptionParser()
parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER",
type="long", help="number of items at which consumer fails (default: %default)")
parser.add_option("-i", "--items", metavar="NUMBER", type="long",
help="number of items to produce (default: %default)", default=10)
parser.add_option("-n", "--consumers", metavar="NUMBER", type="long",
help="number of consumers (default: %default)", default=2)
parser.add_option("-p", "--producer-fails-at", metavar="NUMBER",
type="long", help="number of items at which producer fails (default: %default)")
options, others = parser.parse_args()
worker = Worker(options.items, options.producer_fails_at,
options.consumer_fails_at, options.consumers)
try:
worker.work()
logging.info(u"processed all items")
except KeyboardInterrupt:
logging.warning(u"interrupted by user")
worker._cancelAllConsumers()
except Exception, error:
logging.error(u"%s", error)
worker._cancelAllConsumers()

最佳答案

您需要一个带有取消方法的队列,该方法清空内部队列,设置取消标志,然后唤醒所有人。工作人员将从 join() 中醒来,检查队列中的取消标志并采取适当的行动。消费者将从 get() 唤醒并检查队列中的取消标志并打印错误。然后您的消费者只需要在发生异常时调用 cancel() 方法。

不幸的是,Python 队列没有取消方法。我想到了几个选择:

  • 滚动你自己的队列(可能很难做到正确)
  • 扩展 python 队列并添加取消方法(将您的代码耦合到 Python 队列类的内部实现)
  • 代理队列类并使用您的繁忙等待逻辑重载加入/获取(仍然是一个繁忙等待 hack,但将其限制在一个位置并清理生产者/消费者代码)
  • 找到另一个队列实现/库

关于具有异常处理的 Python 生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8692834/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com