gpt4 book ai didi

python - 使用来自多处理器模块的管理器更新嵌套字典

转载 作者:太空宇宙 更新时间:2023-11-04 02:57:33 28 4
gpt4 key购买 nike

我一直在尝试使用多处理更新嵌套字典。

如果字典包含元素列表,我可以添加我想要的信息,但如果它是嵌套字典,我看不到任何变化。

我知道多处理模块说它是 dictproxy 而不是 dict,我已经尝试更改模块上的示例来实现它,但我没有任何运气。

import socket
import csv
from pprint import pprint
from multiprocessing import Pool,Process,current_process,Manager

def dns_lookup(aggregate,ip):
try:
hostname=socket.gethostbyaddr(ip)[0]
proc_name = current_process().name
#print(str(hostname) + " extracted from ip " + str(ip) + " by process id: " + str(proc_name) )
aggregate[ip]+=hostname
except Exception as e:
pass

if __name__=='__main__':
procs=[]
manager=Manager()
aggregate=manager.dict()
with open("ip_list.csv","r") as ipfile:
csv_reader=csv.reader(ipfile)
for index,row in enumerate(csv_reader):
if index == 0:
pass
else:
aggregate[row[0]]=row[1:]
#ips.append((row[0]))
proc = Process(target=dns_lookup, args=(aggregate,row[0],))
procs.append(proc)
proc.start()

for proc in procs:
proc.join()
pprint(dict(aggregate))

上面的代码有效,但如果我尝试将原始字典更改为

aggregate[row[0]]={'Other Items':row[1:]}

然后尝试将其更新为

d['hostname']=hostname
aggregate[ip]=d
#aggregate[ip]+=d

没有任何影响。

我需要实际列表有一个字典而不是一个元素列表。

当前文件很小,但我必须将其扩展到大约 10k 次查找,因此需要进行多处理。

非常感谢任何帮助。

谢谢,卡兰

最佳答案

是的,dict 的更新似乎没有传播到其他进程。即使将内部字典迁移到 manager.dict() 也不能解决问题。如果您从头开始创建一个新的字典并将其应用于 aggregate[ip],则有效:

aggregate[ip] = {"hostname": hostname, "Other Items": aggregate[ip]['Other Items']}

这可能是一个错误,但我建议您对代码进行更大的更改。有两个缺点:

  1. 您使用 aggregate 既作为仍需要查找的 IP“队列”,也作为进程写入的结果容器。如果您将其拆分为一个队列,而一个仅包含结果的字典可以避免您遇到的问题:那么,您只从队列中读取并且只写入结果容器 aggregate
  2. 如果您的 csv 文件中有 1,000 行,您最终会得到 1000 个进程,而您的计算机一次只能切断 number-of-cores 个进程。在 Linux 上,您会浪费大量不需要的内存。在 Windows 上,您将从头开始启动 1,000 个 python 程序。请改用 Pool,让 python 计算出核心数量并将您的工作分配到这些进程中。

我已经将你的代码重写成这样:

import socket
import csv
from pprint import pprint
from multiprocessing import Pool, Queue, Process, current_process, Manager
from time import sleep

def dns_lookup(aggregate,queue):
while not queue.empty(): # live as long there are items in the queue
row = queue.get()
ip = row[0]
other_items = row[1:]
hostname=socket.gethostbyaddr(ip)[0]
aggregate[ip] = {
"hostname": hostname,
"other items": other_items,
"process_name": current_process().name}

if __name__=='__main__':
procs=[]
manager=Manager()
aggregate=manager.dict()
queue = Queue()
with open("ip_list.csv","r") as ipfile:
csv_reader=csv.reader(ipfile)
next(csv_reader) # instead of the if index == 0; pass

for row in csv_reader: # fill queue before starting any processes
queue.put(row)

# start x processes, where None says to take x = the number of cpus returned by `cpu_count()`
pool = Pool(None, dns_lookup, (aggregate, queue))
pool.close() # signal that we won't submit any more tasks to pool
pool.join() # wait until all processes are done
pprint(dict(aggregate))

此外:您最好使用Threads 而不是multiprocessing,因为您的进程将被网络而非CPU 阻塞。多处理只有在您可以 100% 占用一个 CPU 内核时才有意义。

关于python - 使用来自多处理器模块的管理器更新嵌套字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41921808/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com