gpt4 book ai didi

python - 如何在 python 2.7 中使用 pymongo 进行多处理池

转载 作者:行者123 更新时间:2023-11-30 22:23:24 27 4
gpt4 key购买 nike

我正在使用 Pymongo 和多处理池运行 10 个进程并从 API 获取数据并将输出插入 mongodb。

我认为我编写代码的方式做错了,因为 python 显示双连接比平时打开;例如:如果我运行 10 个进程,Mongodb 将输出已建立的 20 个或更多连接,并且在启动时我将收到以下警告:

UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>

即使我在 mongodb 的连接器客户端输入了 connect=False 。下面是一个示例代码,用于了解如何使用 pymongo 和请求 API 在池中发送请求:

# -*- coding: utf-8 -*-
#!/usr/bin/python

import json # to decode and encode json
import requests # web POST and GET requests.
from pymongo import MongoClient # the mongo driver / connector
from bson import ObjectId # to generate bson object for MongoDB
from multiprocessing import Pool # for the multithreading

# Create the mongoDB Database object, declare collections
client = MongoClient('mongodb://192.168.0.1:27017,192.168.0.2:27017./?replicaSet=rs0', maxPoolSize=20, connect=False)
index = client.database.index
users = client.database.users

def get_user(userid):

params = {"userid":userid}
r = requests.get("https://exampleapi.com/getUser",params=params)
j = json.loads(r.content)
return j

def process(index_line):

user = get_user(index_line["userid"])
if(user):
users.insert(user)

def main():

# limit to 100,000 lines of data each loop
limited = 100
# skip number of lines for the loop (getting updated)
skipped = 0
while True:
# get cursor with data from index collection
cursor = index.find({},no_cursor_timeout=True).skip(skipped).limit(limited)
# prepare the pool with threads
p = Pool(10)
# start multiprocessing the pool with the dataset
p.map(process, cursor)
# after pool finished, kill it with fire
p.close()
p.terminate()
p.join()
# after finishing the 100k lines, go for another round, inifnite.
skipped = skipped + limited
print "[-] Skipping %s " % skipped

if __name__ == '__main__':
main()

我的代码算法有问题吗?有什么方法可以让它更高效、更好地工作并更好地控制我的池吗?

我已经研究了一段时间,但找不到更好的方法来做我想做的事情,很想获得一些帮助。

谢谢。

最佳答案

建议为每个进程创建一次MongoClient,并且不要为每个进程共享同一个客户端。

这是因为 MongoClient 还使用连接池处理来自进程的多个连接,并且不是 fork 安全的

首先,您需要确保当集合中要处理的每个文档都已用完时,while 循环会中断。虽然这并不是一个太精细的条件,但如果 skipped 大于文档计数,您可以打破循环。

其次,在循环外部初始化进程Pool,并在循环内映射进程。multiprocessing.Pool.map 等待子进程完成并返回,因此加入池将导致异常。如果您想异步运行子进程,可以考虑使用 multiprocessing.Pool.async_map

您可以使用 multiprocessing.Queue、生产者进程和消费者进程以更好的方式显式地实现这一点。生产者进程将任务添加到队列中以供消费者进程执行。以这种方式实现解决方案的好处并不那么明显,因为多处理库也使用了队列。

import requests # web POST and GET requests. 
from pymongo import MongoClient # the mongo driver / connector
from bson import ObjectId # to generate bson object for MongoDB
from multiprocessing import Pool # for the multithreading


def get_user(userid):
params = {"userid": userid}
rv = requests.get("https://exampleapi.com/getUser", params=params)
json = rv.json()
return json['content']


def create_connect():
return MongoClient(
'mongodb://192.168.0.1:27017,192.168.0.2:27017/?replicaSet=rs0', maxPoolSize=20
)

def consumer(index_line):
client = create_connect()
users = client.database.users

user = get_user(index_line["_id"])
if user:
users.insert(user)

def main():

# limit to 100,000 lines of data each loop
limited = 100
# skip number of lines for the loop (getting updated)
skipped = 0
client = create_connect()
index = client.database.index
pool = Pool(10)

count = index.count()

while True:

if skipped > count:
break

cursor = index.find({}).skip(skipped).limit(limited)

pool.map(consumer, cursor)

skipped = skipped + limited
print("[-] Skipping {}".format(skipped))

if __name__ == '__main__':
main()

关于python - 如何在 python 2.7 中使用 pymongo 进行多处理池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48055354/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com