- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
mongodb的变更流解释
变更流(Change Streams)允许应用程序访问实时数据变更,从而避免事先手动追踪 oplog 的复杂性和风险。应用程序可使用变更流来订阅针对单个集合、数据库或整个部署的所有数据变更,并立即对它们做出响应。由于变更流采用聚合框架,因此,应用程序还可对特定变更进行过滤,或是随意转换通知。(Change Streams - MongoDB Manual v5.0) 。
使用场景,需要websocket推送实时数据的时候,我们把数据写入mongo的同时,websocket实时监听mongo数据,拿到后推送到订阅组用户.
这里只做一端新增另一端服务监听测试,及windows下副本集快速搭建流程.
。
sub端代码 。
package main import ( "context" "fmt" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "log" ) func main() { // 设置 MongoDB 客户端mongo单机模式不支持这种监听 单机报错 2024/08/10 11:18:54 (Location40573) The $changeStream stage is only supported on replica sets clientOptions := options.Client().ApplyURI("mongodb://localhost:27017") client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { log.Fatal(err) } defer client.Disconnect(context.TODO()) // 获取数据库和集合 collection := client.Database("testdb").Collection("items") // 设置 Change Stream pipeline := mongo.Pipeline{} changeStreamOptions := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := collection.Watch(context.TODO(), pipeline, changeStreamOptions) if err != nil { log.Fatal(err) } defer changeStream.Close(context.TODO()) fmt.Println("开始监听 Change Stream...") // 读取 Change Stream for changeStream.Next(context.TODO()) { var changeEvent bson.M if err := changeStream.Decode(&changeEvent); err != nil { log.Fatal(err) } fmt.Printf("检测到更改: %+v\n", changeEvent) } if err := changeStream.Err(); err != nil { log.Fatal(err) } }
。
pub端代码 。
package main import ( "context" "fmt" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) func main() { // 设置 MongoDB 客户端 clientOptions := options.Client().ApplyURI("mongodb://localhost:27017") client, err := mongo.Connect(context.TODO(), clientOptions) if err != nil { fmt.Println("连接 MongoDB 失败:", err) return } defer client.Disconnect(context.TODO()) // 获取数据库和集合 collection := client.Database("testdb").Collection("items") // 插入数据 for i := 1; i <= 5; i++ { item := bson.D{{"name", fmt.Sprintf("item%d", i)}, {"value", i}} _, err := collection.InsertOne(context.TODO(), item) if err != nil { fmt.Println("插入数据失败:", err) return } //fmt.Printf("插入数据: %+v\n", item) fmt.Printf("插入数据第 %d 条", i) time.Sleep(2 * time.Second) // 模拟一些延迟 } }
执行结果 pub端 。
执行结果 sub端 。
。
数据库不用新建集合,自动生成很方便 。
。
。
。
下面是windows下安装副本集步骤一字不拉 。
https://www.mongodb.com/try/download/community 下载zip包解压 bin目录同级创建data-data4(data内部需要创建好db目录),log-log4 MongoDB shell version v5.0.28 注意 data目录下没有db文件夹net start MongoDB执行服务起不来 192.168.2.6 本机ip mongod.exe --config "E:\mongodb\mongod.conf" --serviceName "MongoDB" --serviceDisplayName "MongoDB" --install mongod.exe --config "E:\mongodb\mongod1.conf" --serviceName "MongoDB1" --serviceDisplayName "MongoDB1" --install mongod.exe --config "E:\mongodb\mongod2.conf" --serviceName "MongoDB2" --serviceDisplayName "MongoDB2" --install mongod.exe --config "E:\mongodb\mongod3.conf" --serviceName "MongoDB3" --serviceDisplayName "MongoDB3" --install net start MongoDB net start MongoDB1 net start MongoDB2 net start MongoDB3 bin目录下打开cmd执行mongo.exe rs_conf={_id:"rs", members:[ {_id:0,host:"192.168.2.6:27017",priority:1}, {_id:1,host:"192.168.2.6:27018",priority:2}, {_id:2,host:"192.168.2.6:27019",priority:3}, {_id:4,host:"192.168.2.6:27020", arbiterOnly:true} ]} 返回这个代表成功: { "_id" : "rs", "members" : [ { "_id" : 0, "host" : "192.168.2.6:27017", "priority" : 1 }, { "_id" : 1, "host" : "192.168.2.6:27018", "priority" : 2 }, { "_id" : 2, "host" : "192.168.2.6:27019", "priority" : 3 }, { "_id" : 4, "host" : "192.168.2.6:27020", "arbiterOnly" : true } ] } rs.initiate(rs_conf) 执行配置 {"ok":1} rs.status() 查看状态 { "set" : "rs", "date" : ISODate("2024-08-10T02:40:20.391Z"), "myState" : 2, "term" : NumberLong(2), "syncSourceHost" : "192.168.2.6:27019", "syncSourceId" : 2, "heartbeatIntervalMillis" : NumberLong(2000), "majorityVoteCount" : 3, "writeMajorityCount" : 3, "votingMembersCount" : 4, "writableVotingMembersCount" : 3, "optimes" : { "lastCommittedOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "lastCommittedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "readConcernMajorityOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "appliedOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "durableOpTime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z") }, "lastStableRecoveryTimestamp" : Timestamp(1723257586, 1), "electionParticipantMetrics" : { "votedForCandidate" : true, "electionTerm" : NumberLong(2), "lastVoteDate" : ISODate("2024-08-10T02:39:15.909Z"), "electionCandidateMemberId" : 2, "voteReason" : "", "lastAppliedOpTimeAtElection" : { "ts" : Timestamp(1723257547, 5), "t" : NumberLong(1) }, "maxAppliedOpTimeInSet" : { "ts" : Timestamp(1723257547, 5), "t" : NumberLong(1) }, "priorityAtElection" : 1, "newTermStartDate" : ISODate("2024-08-10T02:39:15.997Z"), "newTermAppliedDate" : ISODate("2024-08-10T02:39:16.928Z") }, "members" : [ { "_id" : 0, "name" : "192.168.2.6:27017", "health" : 1, "state" : 2, "stateStr" : "SECONDARY", "uptime" : 2677, "optime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"), "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "syncSourceHost" : "192.168.2.6:27019", "syncSourceId" : 2, "infoMessage" : "", "configVersion" : 1, "configTerm" : 2, "self" : true, "lastHeartbeatMessage" : "" }, { "_id" : 1, "name" : "192.168.2.6:27018", "health" : 1, "state" : 2, "stateStr" : "SECONDARY", "uptime" : 85, "optime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDurable" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"), "optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"), "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"), "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.083Z"), "pingMs" : NumberLong(0), "lastHeartbeatMessage" : "", "syncSourceHost" : "192.168.2.6:27017", "syncSourceId" : 0, "infoMessage" : "", "configVersion" : 1, "configTerm" : 2 }, { "_id" : 2, "name" : "192.168.2.6:27019", "health" : 1, "state" : 1, "stateStr" : "PRIMARY", "uptime" : 85, "optime" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDurable" : { "ts" : Timestamp(1723257616, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2024-08-10T02:40:16Z"), "optimeDurableDate" : ISODate("2024-08-10T02:40:16Z"), "lastAppliedWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastDurableWallTime" : ISODate("2024-08-10T02:40:16.003Z"), "lastHeartbeat" : ISODate("2024-08-10T02:40:19.060Z"), "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.022Z"), "pingMs" : NumberLong(0), "lastHeartbeatMessage" : "", "syncSourceHost" : "", "syncSourceId" : -1, "infoMessage" : "", "electionTime" : Timestamp(1723257555, 1), "electionDate" : ISODate("2024-08-10T02:39:15Z"), "configVersion" : 1, "configTerm" : 2 }, { "_id" : 4, "name" : "192.168.2.6:27020", "health" : 1, "state" : 7, "stateStr" : "ARBITER", "uptime" : 85, "lastHeartbeat" : ISODate("2024-08-10T02:40:19.059Z"), "lastHeartbeatRecv" : ISODate("2024-08-10T02:40:20.092Z"), "pingMs" : NumberLong(0), "lastHeartbeatMessage" : "", "syncSourceHost" : "", "syncSourceId" : -1, "infoMessage" : "", "configVersion" : 1, "configTerm" : 2 } ], "ok" : 1, "$clusterTime" : { "clusterTime" : Timestamp(1723257616, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } }, "operationTime" : Timestamp(1723257616, 1) }
。
demo代码链接 。
go/mongochangestreamsdemo/demo at main · liuzhixin405/go (github.com) 。
mongo配置链接 。
config/mongo windows集群 at main · liuzhixin405/config (github.com) 。
最后此篇关于mongo变更流使用及windows下副本集五分钟搭建的文章就讲到这里了,如果你想了解更多关于mongo变更流使用及windows下副本集五分钟搭建的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
为了构建 CentOS 6.5 OSM 切片服务器,我正在寻找一些文档和/或教程。 我试过this one正如我在我的 previous post 中所说的那样但它适用于 Ubuntu 14.04,而
我正在寻找可用于集成任何源代码控制管理系统的通用 git 桥(如 git-svn、git-p4、git-tfs)模板。 如果没有这样的模板,至少有一些关于如何在 git 端集成基本操作的说明(对于其他
1、前言 redis在我们企业级开发中是很常见的,但是单个redis不能保证我们的稳定使用,所以我们要建立一个集群。 redis有两种高可用的方案: High availabilit
简介 前提条件: 确保本机已经安装 VS Code。 确保本机已安装 SSH client, 并且确保远程主机已安装 SSH server。 VSCode 已经安装了插件 C/
为什么要用ELK ELK实际上是三个工具,Elastricsearch + Logstash + Kibana,通过ELK,用来收集日志还有进行日志分析,最后通过可视化UI进行展示。一开始业务量比
在日常办公当中,经常会需要一个共享文件夹来存放一些大家共享的资料,为了保证文件数据的安全,最佳的方式是公司内部服务器搭建FTP服务器,然后分配多个用户给相应的人员。今天给大家分享FileZilla搭
最近由于业务需要,开始进行 Flutter 的研究,由于 Flutter 的环境搭建在官网上有些细节不是很清楚,笔者重新整理输出 1. 配置镜像 由于在国内访问 Flutter
目录 1. 安装go软件包 2. 配置系统变量 3. 安装git 4. 设置go代理 5. 下载gin框架 6. 创建项目 7.
Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。容器是完全使用沙箱机制,相互之间不会有任
上篇文章给大家介绍了使用docker compose安装FastDfs文件服务器的实例详解 今天给大家介绍如何使用 docker compose 搭建 fastDFS文件服务器,内容详情如下所示:
目录 1.创建Maven 2.Maven目录和porm.xml配置 3.配置Tomcat服务器 1.创建Maven
laravel 官方提供 homestead 和 valet 作为本地开发环境,homestead 是一个官方预封装的 vagrant box,也就是一个虚拟机,但是跟 docker 比,它占用体积
这个tutorial显示了 Razor Pages 在 Asp.Net Core 2 中的实现。但是,当我运行 CLI 命令时: dotnet aspnet-codegenerator razorp
我创建了一个单独的类库项目来存储数据库上下文和模型类。在同一解决方案中,我创建了一个 ASP.NET MVC 项目并引用了类库项目,并在项目的 Web.config 文件中包含了数据库上下文的连接字符
关于代码托管,公司是基于Gitlab自建的,它功能全而强大,但是也比较重,我个人偏向于开源、小巧、轻便、实用,所以就排除了Github,在Gogs和Gitea中选者。Gogs在Github有38
目录 1、高可用简介 1.1 高可用整体架构 1.2 基于 QJM 的共享存储系统的数据同步机制分析 1.3 NameNode 主
Nginx 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,它已经在该站点运行超过两年半了。Igor 将源代码以类BSD许可证的形式发布。 在高并发连接的情况
对于我们的 ASP.NET Core 项目,我们使用包管理器控制台中的 Scaffold-DbContext 搭建现有数据库。 每次我们做脚手架时,上下文类与所有实体一起生成,它包含调用 option
我正在使用 .net 核心 2.0。我已经安装了以下 nuget 包:1: Microsoft.AspNetCore.All2: Microsoft.EntityFrameworkCore.Tools
我正在使用 NetBeans 及其 RAD 开发功能开发 JEE6 JSF 应用程序。我想使用脚手架来节省更新 Controller 和模型 View 的时间。 OneToMany 关联在 View
我是一名优秀的程序员,十分优秀!