gpt4 book ai didi

python - multiprocessing.Queue 和 queue.Queue 的实现

转载 作者:IT王子 更新时间:2023-10-29 00:24:06 31 4
gpt4 key购买 nike

我正在寻找比我在文档中找到的更多关于 Python 队列实现的见解。

根据我的理解,如果我在这方面有误,请原谅我的无知:

queue.Queue():通过内存中的基本数组实现,因此不能在多个进程之间共享,但可以在线程之间共享。到目前为止,还不错。

multiprocessing.Queue():通过具有大小限制的管道(man 2 pipes)实现(相当小:在 Linux 上,man 7 pipe 表示 65536 未调整):

Since Linux 2.6.35, the default pipe capacity is 65536 bytes, but the capacity can be queried and set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations

但是,在 Python 中,每当我尝试将大于 65536 字节的数据写入管道时,它都会无一异常(exception)地工作——我可以这样淹没我的内存:

import multiprocessing
from time import sleep

def big():
result = ""
for i in range(1,70000):
result += ","+str(i)
return result # 408888 bytes string

def writequeue(q):
while True:
q.put(big())
sleep(0.1)

if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=writequeue, args=(q,))
p.start()
while True:
sleep(1) # No pipe consumption, we just want to flood the pipe

所以这是我的问题:

  • Python 会调整管道限制吗?如果是,多少?欢迎使用 Python 源代码。

  • Python 管道通信是否可以与其他非 Python 进程互操作?如果是,欢迎提供工作示例(最好是 JS)和资源链接。

最佳答案

为什么 q.put() 没有阻塞?

mutiprocessing.Queue 创建一个管道,如果管道已满则阻塞。当然写入超过管道容量会导致write调用阻塞,直到读取端清除了足够的数据。好的,如果管道在达到其容量时阻塞,为什么 q.put() 不会在管道满时阻塞?即使是示例中对 q.put() 的第一次调用也应该填满管道,并且所有内容都应该阻塞在那里,不是吗?

不,它不会阻塞,因为 multiprocessing.Queue 实现将 .put() 方法与写入管道分离。 .put() 方法将传递给它的数据排入内部缓冲区,并且有一个单独的线程负责从该缓冲区读取数据并写入管道。当管道已满时,该线程将阻塞,但它不会阻止 .put() 将更多数据排入内部缓冲区。

执行.put()将数据保存到 self._buffer 并注意如果没有一个线程已经在运行,它是如何启动一个线程的:

def put(self, obj, block=True, timeout=None):
assert not self._closed
if not self._sem.acquire(block, timeout):
raise Full

with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

._feed()方法是从 self._buffer 中读取并将数据提供给管道的方法。和 ._start_thread()是什么设置了运行 ._feed() 的线程。

如何限制队列大小?

如果你想限制可以写入队列的数据量,我看不到通过指定字节数来实现的方法,但你可以限制存储在内部缓冲区中的项目数通过将数字传递给 multiprocessing.Queue 来任意一次:

q = multiprocessing.Queue(2)

当我使用上述参数并使用您的代码时,q.put() 会将两个项目入队,并在第三次尝试时阻塞。

Python 管道通信是否可以与其他非 Python 进程互操作?

这取决于。 multiprocessing 模块提供的功能不容易与其他语言互操作。我希望 可能 使 multiprocessing 与其他语言互操作,但实现这一目标将是一项重大任务。编写该模块时期望所涉及的进程正在运行 Python 代码。

如果您查看更通用的方法,那么答案是肯定的。您可以使用套接字作为两个不同进程之间的通信管道。例如,从命名套接字读取的 JavaScript 进程:

var net = require("net");
var fs = require("fs");

sockPath = "/tmp/test.sock"
try {
fs.unlinkSync(sockPath);
}
catch (ex) {
// Don't care if the path does not exist, but rethrow if we get
// another error.
if (ex.code !== "ENOENT") {
throw ex;
}
}

var server = net.createServer(function(stream) {
stream.on("data", function(c) {
console.log("received:", c.toString());
});

stream.on("end", function() {
server.close();
});
});

server.listen(sockPath);

还有一个写入它的 Python 进程:

import socket
import time

sockfile = "/tmp/test.sock"

conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)

count = 0
while True:
count += 1
conn.sendall(bytes(str(count), "utf-8"))
time.sleep(1)

如果你想尝试上面的方法,你需要先启动JavaScript端,这样Python端才有东西可写。这是一个概念验证。完整的解决方案需要更多改进。

为了将复杂的结构从 Python 传递到其他语言,您必须找到一种方法以一种双方都可以读取的格式序列化您的数据。不幸的是,泡菜是特定于 Python 的。每当我需要在不同语言之间进行序列化时,我通常会选择 JSON,或者如果 JSON 无法做到这一点,我会使用临时格式。

关于python - multiprocessing.Queue 和 queue.Queue 的实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45148271/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com