gpt4 book ai didi

python - 在进程之间共享不断发展的字典

转载 作者:太空狗 更新时间:2023-10-29 21:57:55 38 4
gpt4 key购买 nike

问题呈现

我面临多处理问题。很大一部分multiprocessing stack overflow的问题没有我的情况复杂,就不答了。有些人投票可能与 this 重复问题,但我的情况不同,在我的情况下,共享 DICT 在进程作业之间修改:

我有一个程序遵循这个简化的生命周期:

A. Initialize DATA dict
B. Initialize 4 subprocess workers
C. Execute code in each workers (worker massively read DATA dict)
D. Wait workers job is done
E. Modify DATA dict content
F. Go to C

性能是问题的一个非常重要的方面。我试验了很多有正反两方面的解决方案:

简单的全局字典(不工作)

在步骤 B 中,DICT 变量被派生到子进程环境中。但是在步骤 E 之后子进程看不到变化。

使用 multiprocessing.Manager 字典

在步骤 A dict 中使用 multiprocessing.Manager 创建(参见“服务器进程”here)。

  • 优点:易于使用
  • 缺点:multiprocessing.Manager 使用序列化层(我不太了解它,但它能够与网络上的进程一起工作),这对表现。

使用多个 multiprocessing.Value 和 multiprocessing.Array 代替字典

multiprocessing.Valuemultiprocessing.Array 允许使用共享内存。我尝试用多个 multiprocessing.Valuemultiprocessing.Array 替换我的字典,如下所示:

用字典:

manager = multiprocessing.Manager()
dict = manager.dict()
dict['positions'] = [42, 165]
dict['on_position_42'] = 1555897
dict['on_position_165'] = 1548792

multiprocessing.Valuemultiprocessing.Array 替换了 dict:

positions = multiprocessing.Array('i', [42, 165])
on_position_42 = multiprocessing.Value('i', 1555897)
on_position_165 = multiprocessing.Value('i', 1548792)

但是在步骤 E 我需要创建新的 multiprocessing.Valuemultiprocessing.Array,例如:

positions.value = [42, 165, 322]
# create new multiprocessing.Value for 322
on_position_322 = multiprocessing.Value('i', 2258777)

然后在步骤 C 中,on_position_322 将对工作人员未知。如果我尝试通过管道将 multiprocessing.Valuemultiprocessing.Array 发送到子进程,将导致“同步对象只能通过继承在进程之间共享”错误。

  • 优点:性能
  • 缺点:如何“通知”子进程新的 multiprocessing.Valuemultiprocessing.Array 的存在?

使用memcache或redis等内存数据库

我知道这是可能的,但我必须将内存数据库与 multiprocessing.Manager dict 进行基准测试。

  • 优点:务实和工作
  • 缺点:性能?

问题结论

考虑到创建新的multiprocessing.Value多处理.Array ?

或者更一般地说,考虑到这个生命周期,什么是最有效的策略?

注意:我之前尝试过另一种策略,其中步骤 F 是“转到 B”(在每个周期重新创建新工作程序)。但是 worker 的 fork 环境太长了:最大的是 DICT,最长的是 fork。

最佳答案

由于您只是从字典中读取并在主进程中更新它,您可以使用 JoinableQueue 来传递字典并等待工作人员完成。例如

from multiprocessing import Process, JoinableQueue
import time

class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue

def run(self):
for item in iter(self.queue.get, None):
print item
time.sleep(2)
print 'done'
self.queue.task_done()
self.queue.task_done()

if __name__ == '__main__':
request_queue = JoinableQueue()
num_workers = 4
workers = []
d = {} # A

for _ in range(num_workers):
p = Worker(request_queue) # B
workers.append(p)
p.start()


for i in range(5): # F
for _ in range(num_workers):
request_queue.put(d) # C
request_queue.join() # D
d[i] = i # E

for w in workers:
w.terminate()
w.join()

输出:

{}
{}
{}
{}
done
done
done
done
{0: 0}
{0: 0}
{0: 0}
{0: 0}
done
done
done
done
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
{0: 0, 1: 1}
done
done
done
done
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
{0: 0, 1: 1, 2: 2}
done
done
done
done
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
{0: 0, 1: 1, 2: 2, 3: 3}
done
done
done
done

关于python - 在进程之间共享不断发展的字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45959222/

38 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com