gpt4 book ai didi

python - 在 Python 队列中伪造一个插入

转载 作者:太空宇宙 更新时间:2023-11-04 06:14:03 24 4
gpt4 key购买 nike

我在 python 中使用队列,它的工作方式类似于 channel 。也就是说,无论何时进行插入,其他线程都在等待并获取插入的值。该值就是 yield 。

@classsynchronized('mutex')
def send(self, message):

# This might raise Full
if not self._is_closed:
self._queue.put(message)
return True

return False

@classsynchronized('mutex')
def close(self):

# Don't do what we don't need to
if self._is_closed:
return

# Make the _queue.get() request fail with an Empty exception
# This will cause the channel to stop listenning to messages
# First aquire the write lock, then notify the read lock and
# finally release the write lock. This is equivalent to an
# empty write, which will cause the Empty exception
print("ACQUIRING not_full")
self._queue.not_full.acquire()

# Close first. If the queue is empty it will raise Empty as fast as
# possible, instead of waiting for the timeout
self._is_closed = True

try:
print("NOTIFYING not_empty")
self._queue.not_empty.notify()
print("NOTIFIED not_empty")
finally:
self._queue.not_full.release()
print("RELEASED not_full")

def _yield_response(self):
try:
while True:

# Fetch from the queue, wait until available, or a timeout
timeout = self.get_timeout()
print("[WAITING]")
message = self._queue.get(True, timeout)
print("[DONE WAITING] " + message)
self._queue.task_done()

# Don't yield messages on closed queues, flush instead
# This prevents writting to a closed stream, but it's
# up to the user to close the queue before closing the stream
if not self._is_closed:
yield message

# The queue is closed, ignore all remaining messages
# Allow subclasses to change the way ignored messages are handled
else:
self.handle_ignored(message)

# This exception will be thrown when the channel is closed or
# when it times out. Close the channel, in case a timeout caused
# an exception
except Empty:
pass

# Make sure the channel is closed, we can get here by timeout
self.close()

# Finally, empty the queue ignoring all remaining messages
try:
while True:
message = self._queue.get_nowait()
self.handle_ignored(message)

except Empty:
pass

我只包含了相关的方法,但请注意这是一个类。问题是,这并不像我预期的那样。队列确实关闭了,所有打印都显示在控制台中,但是等待消息的线程没有得到通知。相反,它总是超时退出。

所有@classsynchronized('mutex') 注解在类方面同步具有相同标识符 ('mutex') 的方法,也就是说,具有相同 ID 的注解的类中的每个方法都相互同步。

我在关闭前获取not_full锁的原因是为了防止在关闭的 channel 中插入。只有这样我才会通知 not_empty 锁。

知道为什么这行不通吗?还有其他建议吗?

提前致谢。

编辑:

我对打印品做了一些改动。我创建 channel 并立即发送消息。然后我发送一个删除它的 HTTP 请求。这是输出:

[WAITING]
[DONE WAITING] message
[WAITING]
ACQUIRING not_full
NOTIFYING not_empty
NOTIFIED not_empty
RELEASE not_full

所以:

  1. 第一条消息得到处理并成功发送(我在客户端获取它,所以...)
  2. 然后队列正在等待。它应该在等待 not_empty 锁,对吧?
  3. 我对 channel 发出删除请求。它获取 not_full 锁(以防止写入),并通知 not_empty 锁

我真的不明白...如果线程收到通知为什么它不解除阻塞??

最佳答案

篡改 Queue 的内部锁似乎是个坏主意。只用Queue的官方接口(interface)来表述问题如何?

例如,要模拟关闭队列,您可以执行 self._queue.put(None) 或其他一些特殊值。得到这个特殊值的等待线程知道队列已经关闭。问题是特殊值不再在队列中以供更多线程查看;但这很容易解决:当线程获得特殊值时,它会立即将其再次放入队列中。

关于python - 在 Python 队列中伪造一个插入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16993565/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com