gpt4 book ai didi

python - set() 可以在 Python 进程之间共享吗?

转载 作者:太空宇宙 更新时间:2023-11-03 12:57:02 30 4
gpt4 key购买 nike

我在 Python 2.7 中使用多处理来处理非常大的数据集。随着每个进程的运行,它会将整数添加到共享的 mp.Manager.Queue() 中,但前提是其他进程尚未添加相同的整数。由于您不能对队列执行“in”式成员资格测试,因此我这样做的方法是检查每个 int 是否属于共享 mp.Manager.list() 中的成员资格。该列表最终将有大约 3000 万个条目,因此成员资格测试将非常缓慢,从而抵消了多处理的优势。

这是我正在做的事情的简化版本:

import multiprocessing as mp

def worker(shared_list, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()

# Use a lock to ensure nothing is added to the list in the meantime
lock.acquire()
# This lookup can take forever when the list is large
if result_int not in shared_list:
out_q.put(result_int)
shared_list.append(result_int)
lock.release()

manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()

for i in range(8):
p = mp.Process(target=worker, args=(shared_list, out_q, lock))
p.start()

我之前尝试使用 set() 而不是 mp.Manager.list(),但似乎每个进程都有自己的内存空间,因此当我更新集合时,它没有跨进程同步。因此,我改用了当前的方法。

以下是我之前尝试使用 set() 的大致方式:将多处理导入为 mp

def worker(shared_set, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()

# Use a lock to ensure nothing is added to the set in the meantime
lock.acquire()
# This lookup is fast, but the set doesn't reflect additions made by other processes.
if result_int not in shared_set:
out_q.put(result_int)
shared_set.add(result_int)
lock.release()

manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()

# This set will NOT synchronize between processes
shared_set = set()


for i in range(8):
p = mp.Process(target=worker, args=(shared_set, out_q, lock))
p.start()

注意:这些示例未经测试,仅代表我的代码的相关部分。

有没有办法跨进程共享集合,或者以其他方式进行更快的成员查找?

编辑:更多信息:out_q 被另一个将数据写入单个输出文件的进程使用。不能有重复。如果我生成一个整数并且发现它是重复的,则该过程需要返回并生成下一个最佳整数。

最佳答案

一个明显的调整是使用 mp.Manager.dict() 而不是集合,并使用任意值(例如,设置 the_dict[result_int] = 1以指示集合中的成员资格)。顺便说一句,在 Python 添加 set 类型之前,“每个人”都是这样实现集合的,即使现在,dicts 和 sets 也是通过基本相同的代码在幕后实现的。

稍后添加:我承认我不明白为什么您在原始代码中同时使用集合和列表,因为集合的键与列表的内容相同。如果输入顺序不重要,为什么不完全忘记列表呢?然后,您还可以删除原始文件中所需的锁定层,以保持集合和列表同步。

根据 dict 的建议充实它,整个函数将变成:

def worker(shared_dict):
# Do some processing and get an integer
result_int = some_other_code()
shared_dict[result_int] = 1

其他进程可以执行 shared_dict.pop() 然后一次获取一个值(虽然,不,他们不能等待 .pop() 因为他们为队列的 .get() 做。

还有一个:考虑使用本地(进程本地)集合?他们会跑得更快。然后每个工作人员都不会添加任何知道的重复项,但进程可能存在重复项。您的代码没有给出任何关于 out_q 消费者做什么的提示,但如果只有一个,那么其中的本地集也可以清除跨进程重复项。或者也许内存负担太重了?无法从这里猜测 ;-)

大编辑

我将建议一种不同的方法:根本不要使用 mp.Manager。大多数时候我看到人们使用它,他们后悔了,因为它没有按照他们认为的方式做。他们的想法:它提供物理上共享的对象。它在做什么:它提供语义共享对象。在物理上,它们存在于 Yet Another 中,在幕后,进程和对象上的操作被转发到后一个进程,在那里它们由该进程在其自己的地址空间中执行。它根本不是物理上共享的。因此,虽然它可能非常方便,但即使是最简单的操作也会产生大量的进程间开销。

所以我建议在一个进程中使用单个普通集合,这将是唯一与清除重复项有关的代码。工作进程产生整数而不关心重复——它们只是传递整数。 mp.Queue 就可以了(同样,不需要 mp.Manager.Queue)。

像这样,这是一个完整的可执行程序:

N = 20

def worker(outq):
from random import randrange
from time import sleep
while True:
i = randrange(N)
outq.put(i)
sleep(0.1)

def uniqueifier(inq, outq):
seen = set()
while True:
i = inq.get()
if i not in seen:
seen.add(i)
outq.put(i)

def consumer(inq):
for _ in range(N):
i = inq.get()
print(i)

if __name__ == "__main__":
import multiprocessing as mp
q1 = mp.Queue()
q2 = mp.Queue()
consume = mp.Process(target=consumer, args=(q2,))
consume.start()
procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
for _ in range(4):
procs.append(mp.Process(target=worker, args=(q1,)))
for p in procs:
p.start()
consume.join()
for p in procs:
p.terminate()

传递给 uniqueifier 的第二个队列扮演原始队列的角色:它只提供唯一的整数。不会尝试“共享内存”,因此不会支付任何费用。唯一的进程间通信是通过简单、显式的 mp.Queue 操作。只有一组,并且由于它不以任何方式共享,所以它会尽可能快地运行。

实际上,这只是设置了一个简单的管道,尽管有多个输入。

关于python - set() 可以在 Python 进程之间共享吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37714680/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com