- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章剖析后OpLog订阅MongoDB的数据变更就没那么难了由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
我们开源了一个订阅分发mysql的binlog的项目,一直用的非常好,忽然有天开发说能不能支持MongoDB的数据订阅呢,MongoDB的使用度也挺广泛的。安排。经过简单的了解后发现MongoDB也有类似binlog的机制,最终花了两天时间把功能完成,并统一抽象集成到binlog开源项目中,使用和binlog同一套订阅分发模型管理MongoDB数据源。整个过程非常顺利,比整mysql的binlog要简单的多了.
先来聊聊MongoDB的主备机制,和mysql的binlog类似,在MongoDB中,有一个系统库“”Local”,库里有一个集合“oplog.rs”,这个集合类似于binlog文件,里面记录了MongoDB的所有操作。从节点通过读取oplog.rs里的数据做到数据同步.
和订阅mysql的binlog一样(模拟一个从节点mysql)。我们的订阅服务要像从节点那样读取解析oplog.rs里的数据。解析前先看下oplog.rs的Document的数据结构 。
上图是一个插入的数据的日志,可见oplog的doc中共有如下字段,含义分别如下:
ts:操作的时间戳(非常重要) 。
t:term最初在主数据库上生成操作的。(含义不明) 。
h:本次操作的唯一hashID 。
v: 版本号 。
op:操作类型,有六种类型,我们只需要关注其中的i(插入)、u(更新)、d(删除)即可 。
ns:库名和集合名称,中间使用“.”连接 。
o:本次操作的document内容 。
o2:只有op操作类型时u更新时,才会有这个字段,代表更新的条件语句 。
$set:o2获取后的文档里的属性,代表更新的字段 。
如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以类比为binlog中的Position。同步mysql的数据时,通过记录消费binlog的位置,也就是Position,可以有效避免订阅服务停机后,消费记录丢失的问题。同步MongoDB时,通过记录ts的值,来记录消费的位置,可以到达和订阅binlog一样的效果。和mysql订阅不同的是,MongoDB的同步需要同步服务自己查询,而且oplog在MongoDB4.0之前的版本有大小限制,超过设置的容量后,老的数据就会被丢失,在4.0之后的版本已经解除了这个限制.
上面已经分析了oplog的结构以及订阅步骤,下面我们直接构建查询即可,需要注意,每次获取到的ts值,需要存储记录下来,已便重新订阅时,从上次断开的记录重新开始。下面直接看代码,重点逻辑都以注释详尽 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
private
BsonTimestamp queryTs;
@Test
public
void
OpLogTest() {
MongoClient mongoClient =
new
MongoClient(
new
MongoClientURI(
"mongodb://admin:admin@127.0.0.1:3717"
));
MongoCollectioncollection = mongoClient.getDatabase(
"local"
)
.getCollection(
"oplog.rs"
);
//如果是首次订阅,需要使用自然排序查询,获取第最后一次操作的操作时间戳。如果是续订阅直接读取记录的值赋值给queryTs即可
FindIterabletsCursor = collection.find().sort(
new
BasicDBObject(
"$natural"
, -
1
))
.limit(
1
);
Document tsDoc = tsCursor.first();
queryTs = (BsonTimestamp) tsDoc.get(
"ts"
);
while
(
true
)
try
{
//构建查询语句,查询大于当前查询时间戳queryTs的记录
BasicDBObject query =
new
BasicDBObject(
"ts"
,
new
BasicDBObject(
"$gt"
, queryTs));
MongoCursordocCursor = collection.find(query)
.cursorType(CursorType.TailableAwait)
//没有数据时阻塞休眠
.noCursorTimeout(
true
)
//防止服务器在不活动时间(10分钟)后使空闲的游标超时。
.oplogReplay(
true
)
//结合query条件,获取增量数据,这个参数比较难懂,见:https://docs.mongodb.com/manual/reference/command/find/index.html
.maxAwaitTime(
1
, TimeUnit.SECONDS)
//设置此操作在服务器上的最大等待执行时间
.iterator();
while
(docCursor.hasNext()) {
Document document = docCursor.next();
//更新查询时间戳
queryTs = (BsonTimestamp) document.get(
"ts"
);
//TODO 在这里接收到数据后通过订阅数据路由分发
String op = document.getString(
"op"
);
String database = document.getString(
"ns"
);
Document context = (Document) document.get(
"o"
);
Document where =
null
;
if
(op.equals(
"u"
)) {
where = (Document) document.get(
"o2"
);
if
(context !=
null
) {
context = (Document) context.get(
"$set"
);
}
}
System.err.println(
"操作时间戳:"
+ queryTs.getTime());
System.err.println(
"操作类 型:"
+ op);
System.err.println(
"数据库.集合:"
+ database);
System.err.println(
"更新条件:"
+ JSON.toJSONString(where));
System.err.println(
"文档内容:"
+ JSON.toJSONString(context));
}
}
catch
(Exception e) { e.printStackTrace(); }
}
|
上面代码只是一个简单的测试用例,完整的应用还需要考虑ts的记录更新,事件的抽象,数据的分发等。我们已经开源的binlog订阅分发项目目前支持数据源在线管理,订阅数据(库、表)在线管理,如果能够使用同一套管理后台管理binlog和oplog的订阅在好不过。要实现和binlog统一管理模型,配置和分发方面基本不需要改动,然后从顶层数据源方面做区分实现即可.
目前我们整合管理的功能都已经开发好了,关于oplog部分的代码还没提交到github上,后面会和大家相见.
以上就是剖析后OpLog订阅MongoDB的数据变更就没那么难了的详细内容,更多关于OpLog订阅MongoDB的数据变更的资料请关注我其它相关文章! 。
原文链接:http://www.kailing.pub/article/index/arcid/274.html 。
最后此篇关于剖析后OpLog订阅MongoDB的数据变更就没那么难了的文章就讲到这里了,如果你想了解更多关于剖析后OpLog订阅MongoDB的数据变更就没那么难了的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我已经在 kubernetes 中部署了一个 3 pod mongodb statefulset,并且我正在尝试使用新的 mongodb+srv 连接字符串 (mongodb 3.6) 连接到具有 S
我已经创建了 MongoDB Atlas 帐户,并尝试连接。但出现以下错误。 MongoDB 连接错误 MongoNetworkError: 首次连接时无法连接到服务器 [cluster0-shard
我正在使用 Node-WebKit 创建桌面应用程序。该应用程序基本上是创建文档(员工日常工作的详细信息),任何注册用户都可以对这些文档发表评论。我正在创建的文档将被分成几个部分。用户将对特定部分发表
我正在尝试使用官方网站上的安装程序在我的本地机器上安装 mongo DB。但是我不断收到这条消息,有人可以帮忙吗? 我试过提供的解决方案 here但没有帮助。 最佳答案 我建议执行以下操作: 按 Wi
我对 MongoDB 和 MongoDB Compass 非常陌生。 我的客户集合中有大约 1000 条记录。如何通过 MongoDB 指南针一次删除所有记录。 非常感谢, 最佳答案 您可以使用 Mo
当我尝试在我的 Ubuntu 机器中安装 mongodb 时,apt-get 会显示以下选项 mongodb mongodb-clients mongodb-dev mongodb-server 谁能
如何将 Robomongo(或任何其他 mongodb 客户端)连接到由本地 Meteor 应用程序创建的 mongodb 实例? 最佳答案 确保 Meteor 正在本地主机上运行。打开终端窗口并运行
我需要在 MongoDB 中生成一个简单的频率表。假设我在名为 books 的集合中有以下文档。 { "_id": 1, genre: [ "Fantasy", "Crime"
我如何在 mongos mapreduce 中指定一个条件,就像我们在 mongos group 函数中所做的那样。 我的数据是这样的 {lid:1000, age:23}, {lid:3000, a
我的 mongodb 数据库文档中有几个 ID。我需要通过脚本在这些 ID 上创建索引,这样我就不必一次又一次地运行 ensureIndex 命令。 db.getCollection("element
在我的数据库中,每个包含项目的文档中都有一个嵌套的元素数组,格式如下: elements:[ { "elem_id": 12, items: [ {"i_id": 1
我正在构建一个应用程序,其中用户可以位于不同的时区,并且我运行的查询对他们的时区很敏感。 我遇到的问题是 MongoDB 似乎在查询时忽略了时区! 这是日期字段“2019-09-29T23:52:13
我正在研究使用 mongodb 进行分片,我有以下结构: 1 个 Mongod 到我的 ConfigServer,在 ReplicaSet 中只有 1 个成员 2 个分片,每个分片在 ReplicaS
我正在尝试获取一个 mongoDB 对象,例如 Friend1 包含另一个 mongoDB 对象 Friend2,该对象又包含第一个对象 Friend1本质上使它成为一个循环对象引用。 要么这样,要么
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题? Update the question所以它是on-topic对于堆栈溢出。 9年前关闭。 Improve this que
Mongo 版本 5.0.2。 Ubuntu 20.0 我在本地主机中启用了 MongoDB 连接的安全性。 我正在尝试通过以下命令使用身份验证详细信息连接我的本地主机 MongoDBmongo ad
我即将将分片的 MongoDB 环境从 2.0.7 升级到 2.2.9,最终我想升级到 2.4.9,但显然我需要通过 2.2 来完成。 2.2 的发行说明声明配置服务器应该首先升级其二进制文件,然后是
目前,我无法在我的虚拟 Ubuntu 机器上远程连接 mongodb 服务器。我无法使用在我的 Windows PC 上运行的 Robomongo 客户端连接,该 PC 也运行 vm。 这是两台电脑的
我创建了一个免费的 mongodb 集群。我创建了一个用户,设置了与 mongodb compass 的连接,复制了连接字符串,然后打开了我的 mongodb compass。将复制的字符串粘贴到那里
我使用 java 代码创建了 mongo 数据库集合索引 dbCollection.createIndex("accountNumber"); 当我看到索引使用 db.accounts.getInde
我是一名优秀的程序员,十分优秀!