- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试将 MongoDB 中的 oplog.rs 加载到 spark DataFrame 中,它加载了元数据并通过 printSchema
函数对其进行了验证,但是当我尝试执行诸如 show 或 count 之类的操作时它给了我这个错误 scala.MatchError: ((BsonMinKey,null),0) (of class scala.Tuple2)
。我也尝试将其注册为 temptable,但仍然出现相同的错误。
val customReadConfig = ReadConfig(Map(
"uri" ->
"mongodb://username:password@host_name:port/local.oplog.rs?authSource=xxxxx"
))
val dataframe = sqlContext.read.format("com.mongodb.spark.sql").
options(customReadConfig.asOptions).load
最佳答案
为了后代:
Mongo >= 3.2 版本的默认分区器是 MongoSamplePartitioner
,它使用(像所有其他分区器一样)partitionKey
并且在创建分区时使用BsonMinKey
和 BsonMaxKey
来定义每个分区的边界。您遇到的匹配错误可能发生在此处:
def createPartitions(partitionKey: String, splitKeys: Seq[BsonValue],
locations: Seq[String] = Nil, addMinMax: Boolean = true):
Array[MongoPartition] = {
val minKeyMaxKeys = (new BsonMinKey(), new BsonMaxKey())
val minToMaxSplitKeys: Seq[BsonValue] = if (addMinMax) minKeyMaxKeys._1 +: splitKeys :+ minKeyMaxKeys._2 else splitKeys
val minToMaxKeysToPartition = if (minToMaxSplitKeys.length == 1) minToMaxSplitKeys else minToMaxSplitKeys.tail
val partitionPairs: Seq[(BsonValue, BsonValue)] = minToMaxSplitKeys zip minToMaxKeysToPartition
partitionPairs.zipWithIndex.map({
case ((min: BsonValue, max: BsonValue), i: Int) => MongoPartition(i, createBoundaryQuery(partitionKey, min, max), locations)
}).toArray
}
该错误告诉您的是您的 max
被设置为 null,正如您在代码中看到的那样,只处理了一种情况。如果您没有设置要使用的partitionKey
,分区程序将默认使用_id
,您可以阅读它here
默认情况下,oplog.rs
集合没有_id
键,oplog 记录的唯一id 是惊人的h
,它是一个数字。因此,为了让分区程序做正确的事情,您需要在 SparkConf
或 ReadConfig
中设置 spark.mongodb.input.partitionerOptions.partitionKey
到 h
。
new SparkConf()
//all of your other settings
.set("spark.mongodb.input.partitionerOptions.partitionKey", "h")
关于mongodb - 将 mongodb oplog.rs 加载到 spark dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42584984/
我有一个包含 3 个成员的副本集。每个人都在 Linux 上运行 mongod 2.4.8。我正在使用 MMS 代理来监视我的副本集/主机的运行状况,并且一切正常。 我正在使用 mongodump 来
我以前看过这个问题,但答案很模糊。我一直在对 oplog 进行一些研究,并试图准确了解它是如何工作的。特别是,我想对 oplog 文档中的字段以及它们存储的数据有一个很好的了解。 这些是我通过测试发现
我需要能够恢复单个数据库,甚至是备份中的单个集合。由于 mongodump --oplog 仅适用于完整实例(副本集),我进行了以下过程以仅过滤我想从 oplog.bson 由 --oplog 选项生
我正在尝试改进我的 MongoDB 服务器的 oplog,因为现在它覆盖的时间比我想要的要少(我现在不打算增加 oplog 文件的大小)。我发现oplog集合中有很多noops记录-{“op”:“n”
所以我刚刚在我的项目中添加了 redis-oplog https://github.com/cult-of-coders/redis-oplog meteor add cultofcoders:red
有一个 OpLog 游标,是否有可能在更新操作中获取除默认 _id 之外的另一个索引? 背景: 我有一个分片集群,使用复合索引作为分片键。此复合键的一部分用于确定哪一组分片用于存储数据(也称为 Tag
我正在使用 Elastich Search 和 MongoDB 开发一个应用程序。 Elasticsearch 使用 MongoDB oplog 通过称为 river 的组件对内容进行索引。 是否可以
我想使用 oplog 从单个数据库创建增量备份策略 由于 mongodump 的 --oplog 选项创建了完整版本的转储,我想知道是否有任何方法可以: 从 单个数据库 使用“x”秒前的 oplog
我正在寻找在副本集(非分片)上执行 Mongodb 备份的正确方法。 通过阅读 Mongodb 文档,我了解到“mongodump --oplog”应该足够了,即使在副本(从属)服务器上也是如此。 F
MongoDB oplog“op”字段中的不同字母是什么意思?我可以猜到一些字母的含义,但我不确定。 "n"= ? "i"= insert ,这样对吗? "u"= 更新,这对吗? "c"= count
这个问题在这里已经有了答案: How can Python Observe Changes to Mongodb's Oplog (4 个回答) 关闭6年前。 我正在尝试在 mongo 的 oplog
是否可以修改MongoDB oplog并重放它? 一个错误导致将更新应用于比预期更多的文档,从而覆盖了一些数据。数据已从备份中恢复并重新集成,因此实际上没有丢失任何内容,但我想知道是否有办法修改 op
我有 2 个节点集群(8 个 vCPU,52 GB)用于 mongodb(3.2.0)。我正在调试几个需要几秒钟的查询,我运行了 db.currentOp()查看正在运行哪些查询以及它们花费了多少时间
我知道 oplog 文件会将多个更新拆分为单个更新,但是批量插入呢?这些也被分成单独的插入吗? 如果我有一个写入密集型集合,大约每 30 秒插入一批约 20K 文档,我是否/应该考虑将 oplog 大
我们正在使用 mongo java 驱动程序 3.2.2 和 mongo oplog 集合来识别我们的 mongo 集合中的更改(Mongo 服务器版本为 3.2)。我们遇到了以下 2 个问题,并且对
是否可以使用 mongodump 并将其 mongorestore 到不同的主机,使用不同的数据库名称,并启用 oplog? From: mongodb://user:password@source-
我想查看 oplog,所以我按照 here 中的建议启动了 MongoDB。 mongod --master --port 8888 --dbpath ... 但是当我运行的时候 >db.oplog.
我的 mongodb 今天被黑了,所有数据都被删除了,黑客需要一些钱才能取回,我不会付钱给他,因为我知道他不会把我的数据库还给我。 但我打开了 oplog,我看到它包含超过 300,000 个文档,保
我最近开始使用 MongodDb,我正在尝试探索副本集和崩溃恢复。 我已经读过它,就像日志文件正在写一个头重做日志文件。oplog 文件是每个写入事件都将被写入的文件。 这两者有什么区别...?我们是
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题吗? 更新问题,以便 editing this post 提供事实和引用来回答它. 关闭 8 年前。 Improve
我是一名优秀的程序员,十分优秀!