gpt4 book ai didi

mongodb - 将 mongodb oplog.rs 加载到 spark dataframe

转载 作者:可可西里 更新时间:2023-11-01 10:49:51 29 4
gpt4 key购买 nike

我正在尝试将 MongoDB 中的 oplog.rs 加载到 spark DataFrame 中,它加载了元数据并通过 printSchema 函数对其进行了验证,但是当我尝试执行诸如 show 或 count 之类的操作时它给了我这个错误 scala.MatchError: ((BsonMinKey,null),0) (of class scala.Tuple2)。我也尝试将其注册为 temptable,但仍然出现相同的错误。

val customReadConfig = ReadConfig(Map(
"uri" ->
"mongodb://username:password@host_name:port/local.oplog.rs?authSource=xxxxx"
))

val dataframe = sqlContext.read.format("com.mongodb.spark.sql").
options(customReadConfig.asOptions).load

最佳答案

为了后代:

Mongo >= 3.2 版本的默认分区器是 MongoSamplePartitioner,它使用(像所有其他分区器一样)partitionKey 并且在创建分区时使用BsonMinKeyBsonMaxKey 来定义每个分区的边界。您遇到的匹配错误可能发生在此处:

  def createPartitions(partitionKey: String, splitKeys: Seq[BsonValue], 
locations: Seq[String] = Nil, addMinMax: Boolean = true):
Array[MongoPartition] = {
val minKeyMaxKeys = (new BsonMinKey(), new BsonMaxKey())
val minToMaxSplitKeys: Seq[BsonValue] = if (addMinMax) minKeyMaxKeys._1 +: splitKeys :+ minKeyMaxKeys._2 else splitKeys
val minToMaxKeysToPartition = if (minToMaxSplitKeys.length == 1) minToMaxSplitKeys else minToMaxSplitKeys.tail
val partitionPairs: Seq[(BsonValue, BsonValue)] = minToMaxSplitKeys zip minToMaxKeysToPartition
partitionPairs.zipWithIndex.map({
case ((min: BsonValue, max: BsonValue), i: Int) => MongoPartition(i, createBoundaryQuery(partitionKey, min, max), locations)
}).toArray
}

该错误告诉您的是您的 max 被设置为 null,正如您在代码中看到的那样,只处理了一种情况。如果您没有设置要使用的partitionKey,分区程序将默认使用_id,您可以阅读它here

默认情况下,oplog.rs 集合没有_id 键,oplog 记录的唯一id 是惊人的h ,它是一个数字。因此,为了让分区程序做正确的事情,您需要在 SparkConfReadConfig 中设置 spark.mongodb.input.partitionerOptions.partitionKeyh

 new SparkConf()
//all of your other settings
.set("spark.mongodb.input.partitionerOptions.partitionKey", "h")

关于mongodb - 将 mongodb oplog.rs 加载到 spark dataframe,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42584984/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com