gpt4 book ai didi

python - Pymongo 多处理

转载 作者:可可西里 更新时间:2023-11-01 09:20:07 24 4
gpt4 key购买 nike

我必须在 MongoDB 上执行大量插入和更新操作。

我正在尝试测试多处理来完成这些任务。为此,我创建了这个简单的代码。我的虚拟数据是:

documents = [{"a number": i} for i in range(1000000)]

没有多处理:

time1s = time.time()
client = MongoClient()
db = client.mydb
col = db.mycol
for doc in documents:
col.insert_one(doc)
time1f = time.time()
print(time1f-time1s)

我有 150 秒。

对于多处理,我根据需要和 Pymongo's FAQs 中的描述定义了以下工作函数.

def insert_doc(document):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_one(document)

但是,当我运行我的代码时:

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, documents)
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)

我得到一个错误:

pymongo.errors.ServerSelectionTimeoutError: localhost:27017: [Errno 99] Cannot assign requested address

在出现错误之前,总共处理了 26447 个文档。这个错误解释here ,尽管遇到该错误的人没有使用多处理。那里的解决方案是只打开一个 MongoClient,但是当我想进行多处理时这是不可能的。有什么解决方法吗?感谢您的帮助。

最佳答案

您的代码为示例中的每百万个文档创建了一个新的 MongoClient(就像您链接到的问题一样)。这要求您为每个新查询打开一个新套接字。这会破坏 PyMongo 的连接池,除了速度极慢之外,它还意味着您打开和关闭套接字的速度快于您的 TCP 堆栈可以跟上的速度:您将太多套接字留在 TIME_WAIT 状态,因此您最终会耗尽端口。

如果您向每个客户端插入大量文档,则可以创建更少的客户端,因此打开更少的套接字:

import multiprocessing as mp
import time
from pymongo import MongoClient

documents = [{"a number": i} for i in range(1000000)]

def insert_doc(chunk):
client = MongoClient()
db = client.mydb
col = db.mycol
col.insert_many(chunk)

chunk_size = 10000

def chunks(sequence):
# Chunks of 1000 documents at a time.
for j in range(0, len(sequence), chunk_size):
yield sequence[j:j + chunk_size]

time2s = time.time()
pool = mp.Pool(processes=16)
pool.map(insert_doc, chunks(documents))
pool.close()
pool.join()
time2f = time.time()
print(time2f - time2s)

关于python - Pymongo 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41104582/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com