gpt4 book ai didi

python - 用 Python 监听 mongoDB

转载 作者:行者123 更新时间:2023-12-05 06:40:20 27 4
gpt4 key购买 nike

我正在尝试收听 MongoDB 集合,对于字段 category 为空列表的所有文档,做一些事情来填充 category 字段,并且然后收听以下传入的文档。

使用这些(旧的):

How to listen for changes to a MongoDB collection?

with Python is there a way to listen for changes when insert or update is made in mongodb

我想出了以下内容:

client = pymongo.MongoClient(mongodb_uri_connexion_string)
db = client.get_default_database()
my_collection = db['my_collection']

cursor = my_collection.find({'category':[]},cursor_type=pymongo.cursor.CursorType.EXHAUST)

while True:
try:
doc = cursor.next()
# place where computation gets done to find the value of new_cat
my_collection.find_one_and_update({'_id':doc['_id']}, {'$push':{'category':{new_cat:'1'}}})
pprint.pprint(my_collection.find_one({'_id':doc['_id']})

except StopIteration:
print("end of cursor, wait")
time.sleep(0.2)
except Exception as e:
print("another problem came up, see below")
print(exception_to_string(e))

当我运行脚本时,它正在做它应该做的事情,打印更新了 categorydoc,直到它完成内容行运行时 cursor 的数量:它没有监听 my_collection 中的后续新 doc,我得到了无限数量的 光标结束,等待

如果我将 CursorType 更改为 TAILABLE_AWAIT,它似乎甚至不会在 try block 中进行计算,而是直接跳转到无限打印光标结束,等待

我做错了什么?谢谢

最佳答案

截至目前,在 mongodb 上执行监听选项的最佳方法是使用更改流,这反过来又需要使用副本集。副本集是一种冗余备份功能,它允许备份数据库在现有数据库发生故障时无缝接管现有数据库。

无论如何,Mongodb 文档对于这个问题的目的来说有点复杂,所以这里是简单的一步一步:

  1. 在已安装的 mongodb 上,打开配置文件 /etc/mongod.conf 并添加以下行
replication:
replSetName: "rs0"
  1. 现在打开 mongo shell 并输入

    > rs.initiate()

这应该给出下面的输出

   {
"info2" : "no configuration specified. Using a default configuration for the set",
"me" : "127.0.0.1:27017",
"ok" : 1,
"$clusterTime" : {
"clusterTime" : Timestamp(1613644807, 2),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
},
"operationTime" : Timestamp(1613644807, 1)
}

它现在应该将您的 shell 显示为rs0:主要>

副本集现已安装,您可以使用更改流来监听 mongodb 插入。这在 Pymongo 中非常简单。以您的示例为例,以下代码将监听集合 "coll" 上的插入,如果没有“category”字段,它将添加字段“category”作为“无”

Python代码:

    import pymongo
mongoclient = pymongo.MongoClient("mongodb://127.0.0.1:27017")
db = mongoclient['My_DB']
coll = db['coll']
with coll.watch([{'$match': {'operationType': 'insert'}}]) as change_stream:
for insert_change in change_stream:
print(insert_change)
inserted_entry = insert_change['fullDocument']
if 'category' not in inserted_entry.keys():
coll.update_one(inserted_entry,{"$set":{"category":"None"}})

注意:您可以直接使用“inserted_entry”作为查询字典,因为它已经包含该记录中的所有字段,包括objectId。

Mongo 数据库:

rs0:PRIMARY> use My_DB
switched to db My_DB
rs0:PRIMARY> db.coll.insert({"Name":"Jack"})
WriteResult({ "nInserted" : 1 })
rs0:PRIMARY> db.coll.find()
{ "_id" : ObjectId("602e61bc0dc4aa57751b5e02"), "Name" : "Jack", "category" : "None" }

关于python - 用 Python 监听 mongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42895683/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com