- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我对这个应用程序的目标是创建监视数据库的逻辑,并在将文档添加到数据库时触发操作(例如发送电子邮件)。但是,由于第一次填充数据库时可能不会启动此应用程序,我如何手动创建一个指向添加到集合中的第一个文档的 ResumeToken,以便在第一次运行时,我可以从头开始并遍历更改,直到我到达终点。我认识到我需要存储来自 lastChangeStreamDocument 的 ResumeToken 以供将来重新启动,但我对“首次运行”场景感兴趣。我虽然 enumerator.Reset();
是正确的选项,但它引发了一个异常,表明它不受支持。
我遵循了 https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs 中提供的测试并使用以下代码成功配置了 Change Stream
mongoClient = mongoClient ?? new MongoClient(ConnectionString); //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");
try
{
var cursor = collection.Watch();
var enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext(); //Blocks until a record is UPDATED in the database
var lastChangeStreamDocument = enumerator.Current;
enumerator.Dispose();
//lastChangeStreamDocument.FullDocument.Should().Be(document);
}
catch( Exception ex)
{
Logger.WriteException(ex);
}
但是,使用此代码,enumerator.MoveNext() 行会阻塞,直到文档被更新,因此我只能在设置更改流后获取对更新文档的引用。
我想搜索 local.oplog 数据库并获取插入集合中的第一个文档的 UUID 并且成功了,但是,我没有看到将此引用转换为 ResumeToken 对象的方法可以喂 watch 方法。
更新:
ResumeToken 似乎存储为 Base64,其中包含时间戳、o._id ObjectID 以及来自 oplog 条目的 ui UUID。我需要更多地遍历代码,但从源代码 ( https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp ) 看来,ResumeTokens 有不同的格式。有了这些信息,我希望可以构建自己的 ResumeToken 来匹配数据库期望的格式。
更新#2:
经过更多研究,我偶然发现了在 mongo 中解析 key_string
的代码 github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp .此文件包含 CType 的定义。我将 Base64 解码为字节数组,然后使用 CType 枚举定义,我能够更多地了解如何构建我自己的 ResumeToken。
考虑以下示例:更新文档后,我在 ChangeStream 上捕获了 ResumeToken。
glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==
解码为字节数组:
82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04
我已经解码为:
//Timestamp (of oplog entry??)
82 //CType::TimeStamp
5a 7d ce c8 00 00 00 01 //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
// that integer converts to 0x5A7DCEC8
//Unknown Object
46 //CType::Object
64 5f 69 64 //Either expecting a 32b value or null terminated
00 //Null terminator or divider
//Document ID
64 //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc //o._id value from oplog entry
00 //OID expecting null terminated
//UUID
5a //CType::BinData
10 //Length (16b)
04 //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e //UUID value from oplog entry
04 //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?
我现在遇到的问题是,如果我在一个集合中有很多 oplog 条目,并且我使用 oplog 中第一个条目的 ts、ui 和 o._id 值来构建我自己的 ResumeToken(硬编码未知 0x4664 5f69 6400
block 和结尾的 0x04
字节,然后服务器在设置 collection.Watch
时将其接受为有效的 ResumeToken。但是, enumerator.moveNext() 调用返回的文档总是返回第 3 个 oplog 条目而不是第 2 个!
在不知道该 12 字节 block 的用途,也不知道为什么我指向第 3 个条目而不是第 2 个条目的情况下,我很紧张在生产中依赖它。
更新 #3:
那些有问题的字节 block :
46 64 5f 69 64 00
0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL
以下字节 block 描述了受影响文档的 ObjectId,或者它的“_id”键。那么“d”字符的意义是什么?
最佳答案
在解决这个问题时,我一直在使用其他信息更新问题,现在我已经设法将其拼凑起来,因此它可以正常工作。
下面是我创建的代码:
希望这段代码对其他尝试做同样事情的人有所帮助。
/// <summary>
/// Locates the first document for the given namespace in the local.oplog collection
/// </summary>
/// <param name="docNamespace">Namespace to search for</param>
/// <returns>First Document found in the local.oplog collection for the specified namespace</returns>
internal static BsonDocument GetFirstDocumentFromOpLog(string docNamespace)
{
mongoClient = mongoClient ?? new MongoClient(ConnectionString); //Create client object if it is null
IMongoDatabase localDB = mongoClient.GetDatabase("local");
var collection = localDB.GetCollection<BsonDocument>("oplog.rs");
//Find the documents from the specified namespace (DatabaseName.CollectionName), that have an operation type of "insert" (The first entry to a collection must always be an insert)
var filter = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>("{ $and: [ { 'ns': '" + docNamespace + "'}, { 'op': 'i'}] }");
BsonDocument retDoc = null;
try //to get the first document from the oplog entries
{
retDoc = collection.Find<BsonDocument>(filter).First();
}
catch(Exception ex) { /*Logger.WriteException(ex);*/ }
return retDoc;
}
/// <summary>
/// Takes a document from the OpLog and generates a ResumeToken
/// </summary>
/// <param name="firstDoc">BsonDocument from the local.oplog collection to base the ResumeToken on</param>
/// <returns>A ResumeToken that can be provided to a collection watch (ChangeStream) that points to the firstDoc provided</returns>
private static BsonDocument GetResumeTokenFromOpLogDoc(BsonDocument firstDoc)
{
List<byte> hexVal = new List<byte>(34);
//Insert Timestamp of document
hexVal.Add(0x82); //TimeStamp Tag
byte[] docTimeStampByteArr = BitConverter.GetBytes(firstDoc["ts"].AsBsonTimestamp.Timestamp); //Timestamp is an integer, so we need to reverse it
if (BitConverter.IsLittleEndian) { Array.Reverse(docTimeStampByteArr); }
hexVal.AddRange(docTimeStampByteArr);
//Expecting UInt64, so make sure we added 8 bytes (likely only added 4)
hexVal.AddRange(new byte[] { 0x00, 0x00, 0x00, 0x01 }); //Not sure why the last bytes is a 0x01, but it was present in observed ResumeTokens
//Unknown Object observed in a ResumeToken
//0x46 = CType::Object, followed by the string "d_id" NULL
//This may be something that identifies that the following value is for the "_id" field of the ObjectID given next
hexVal.AddRange(new byte[] { 0x46, 0x64, 0x5F, 0x69, 0x64, 0x00 }); //Unknown Object, expected to be 32 bits, with a 0x00 terminator
//Insert OID (from 0._id.ObjectID)
hexVal.Add(0x64); //OID Tag
byte[] docByteArr = firstDoc["o"]["_id"].AsObjectId.ToByteArray();
hexVal.AddRange(docByteArr);
hexVal.Add(0x00); //End of OID
//Insert UUID (from ui) as BinData
hexVal.AddRange(new byte[] { 0x5a, 0x10, 0x04 }); //0x5A = BinData, 0x10 is Length (16 bytes), 0x04 is BinDataType (newUUID)
hexVal.AddRange(firstDoc["ui"].AsByteArray);
hexVal.Add(0x04); //Unknown marker (maybe end of resumeToken since 0x04 == ASCII 'EOT')
//Package the binary data into a BsonDocument with the key "_data" and the value as a Base64 encoded string
BsonDocument retDoc = new BsonDocument("_data", new BsonBinaryData(hexVal.ToArray()));
return retDoc;
}
/// <summary>
/// Example Code for setting up and resuming to the second doc
/// </summary>
internal static void MonitorChangeStream()
{
mongoClient = mongoClient ?? new MongoClient(ConnectionString); //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");
var options = new ChangeStreamOptions();
options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;
try
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }"); //Works
//Build ResumeToken from the first document in the oplog collection
BsonDocument resumeTokenRefDoc = GetFirstDocumentFromOpLog(collection.CollectionNamespace.ToString());
if (resumeTokenRefDoc != null)
{
BsonDocument docResumeToken = GetResumeTokenFromOpLogDoc(resumeTokenRefDoc);
options.ResumeAfter = docResumeToken;
}
//Setup the ChangeStream/Watch Cursor
var cursor = collection.Watch(pipeline, options);
var enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext(); //Blocks until a record is UPDATEd, REPLACEd or INSERTed in the database (thanks to the pipeline arg), or returns the second entry (thanks to the ResumeToken that points to the first entry)
ChangeStreamDocument<BsonDocument> lastChangeStreamDocument = enumerator.Current;
//lastChangeStreamDocument is now pointing to the second entry in the oplog, or the just received entry
//A loop can be setup to call enumerator.MoveNext() to step through each entry in the oplog history and to also receive new events
enumerator.Dispose(); //Be sure to dispose of the enumerator when finished.
}
catch( Exception ex)
{
//Logger.WriteException(ex);
}
}
如果大家对代码有什么改进的建议,请提出建议。我还在学习。
关于c# - 如何在第一个文档中恢复 MongoDB ChangeStream,而不仅仅是在我开始收听后进行更改,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48665409/
有什么方法可以恢复删除的元素吗? 这是我删除元素的代码 myFunction() { var width = window.innerWidth; var February = doc
我有一个 TokuDB 表,由于某种原因缺少 ***_status.tokudb 文件。 我还不确定文件是否由于 TokuDB 崩溃而丢失。 问题是: 有没有办法从主要文件和关键文件(我可以从 tok
我正在 Windows 7 (x86) 上运行带有 Workbench 6.3.8 的 32 位 MySQL Server 5.7.22 本地实例(必须选择 32 位版本 - 所以,较旧的版本)。 我
1、备份 <% SQL="backup database 数据库名 to disk='"&Serve
1、ASP中怎么实现SQL数据库备份、恢复! 答:asp在线备份sql server数据库: 1、备份 <% SQL="ba
我在 R 中使用 stats::filter 函数来理解 R 中的 ARIMA 模拟(如在函数 stats::arima.sim 中)和估计。我知道 stats::filter 将线性过滤器应用于向量
我已经浏览了示例应用程序的文档和代码,并发现 files/objectbox/objectbox/data.mdb 是存储所有数据的默认文件。 假设我的理解是正确的,我有几个问题找不到文档: 我想在我
为了恢复非续订订阅类型的 InAppPurchase,我已经实现了服务器来处理此问题。 但在购买过程中,iTunes 有时不会要求用户验证他们的卡详细信息, 在这种情况下,它会在后台发送应用程序并显示
我的问题是寻找cocos2d游戏期间暂停/恢复状态(包括所有需要保存的数据信息)的设计解决方案。 包括但不限于以下情况: 1).用户选择退出,然后弹出一个对话框供用户选择“直接退出”、“暂停”; 2)
在 Mercurial 中,我有一个旧的变更集,除了对单个文件的更改外,它都很好。我将如何恢复对该单个文件的更改? 即使只是能够在上一个变更集中查看文件的状态也会很好,然后我可以剪切和粘贴。 我的 M
我的一项职能遇到了困难。我想做的是计时器在页面加载后立即启动,并且只有一个带有启动/恢复的按钮。我无法在代码中找出需要更改功能的位置。有人可以帮助我吗?谢谢! HTML: , Javascr
我正在阅读Scrap your type classes 。这为类型类提供了替代方案。然而,我被Paul Chiusano的评论所困扰。其中讨论了恢复 do 表示法 语法。 坦白说,我无法理解 ret
当 OrientDB 因某人重新启动机器而非正常关闭时,OrientDB 最终会处于数据恢复失败的状态。对于如何从这种不正常的关闭中正常恢复有什么建议吗?我们正在寻找系统在断电期间能够自行恢复的方法。
我正在构建一个 Electron 应用程序,如果发生崩溃,它必须重新加载渲染进程窗口。 目前我可以从主进程重新启动应用程序 app.relaunch(); app.quit(); 但我无法检测到窗口崩
我有 3 个 Activity ,比如说 MainActivity、 Activity 2 和 Activity 3。 在 MainActivity 中,我有一个按钮(开始/停止),当我单击此按钮时,
关闭。这个问题是off-topic .它目前不接受答案。 想改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 关闭 11 年前。 Improve thi
Twilio 是否支持暂停和恢复内容播放。换句话说,我有相当长的文件将播放给调用者,并且我正在尝试找到一种方法来实现暂停和恢复功能。在播放某些内容的过程中,我希望用户能够按数字暂停,然后再次按数字从音
我已经提交了 A、B、C、D 和 E。我意识到在提交 B 中发生了一些非常糟糕的事情,所以我想回到 A,这次正确地进行之前搞砸了 B 的更改,然后重新应用 C 、 D 和 E 自动。 您可能想知道为什
我的一个文件被“标记为文本”,图标也发生了变化。实际上这是一个 PHP 文件。我尝试过使用 Help -> Find Action -> Mark As 尝试将其恢复为 PHP 突出显示,但它不起作用
我有一些 SSE 程序,可以将循环中的内存归零,当指针未对齐时,它会引发 SIGSEGV进入我的处理程序。我可以在此类处理程序中获取更多信息吗例行公事,现在我不知道它是在哪里完成的,我也可以吗以某种可
我是一名优秀的程序员,十分优秀!