- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个从 HBase 转换而来的 RDD:
val hbaseRDD: RDD[(String, Array[String])] 其中 tuple._1 是行键。数组是HBase中的值。
4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]
我还有一个 SchemaRDD (id,date1,col1,col2,col3) 转换为
val refDataRDD: RDD[(String, Array[String])] 我将对其进行迭代并检查它是否存在于 hbaseRDD 中:
4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]
问题是,
如何检查 hbaseRDD 中是否存在键 (tuple._1)/("4929103") 并获取相应的值 (tuple._2)? - 我不能在 rdd.filter 中使用 PairRDD 的查找函数,它会抛出“scala.MatchError: null”,但它在外部有效
val filteredRDD = rdd.filter(sqlRow => {
val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE")
// if found, check if date1 of hbaseRDD < sqlRow(1)
// else if not found, retain row
true
})
不过我不确定这是否是问题所在,因为当我将查找行切换到以下位置时我也遇到了 NPE:
val sqlRowHbase = hbaseRDD.filter(row => {
注意:我在这些行之前执行 hbaseRDD.count。并且 hbaseRDD.lookup 在 rdd.filter 之外工作正常
所以基本上,我试图在 hbaseRDD 中按键“查找”并获取行/值。加入它们有点复杂,因为两个 RDD 中的某些值可能为空。这取决于很多场景,哪些行会保留哪些数据。
最佳答案
假设您需要查找的 a_id 集合包含在 RDD 中,我认为您可以使用 leftOuterJoin 而不是迭代和查找每个值。
我在上面看到了您关于 date1 的潜在可变位置的评论。不过,我没有在下面解决它,我认为这应该在查找本身之前通过每行的某种特定映射来处理。
如果我得到正确的伪代码,你有一个 (id, date)
的 RDD 并且想通过在 hbase 中查找数据来更新它,如果在 hbase 中找到一行则更新日期对于这个 id,如果它的日期早于 refData 中的日期。对吗?
如果是这样,假设你有一些像这样的引用数据:
val refData = sc.parallelize(Array(
("4929103","2015-05-21 10:03:44"),
("4929104","2015-05-21 10:03:44")
))
还有一些来自 Hbase 的行数据:
val hbaseRDD = sc.parallelize(Array(
("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")),
("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")),
("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44"))
))
然后您可以使用简单的 leftOuterJoin 将 refData 中的每个 id 查找到 hbase 中,对于找到的每一行:如有必要,更新日期:
refData
// looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds
.leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)})
// update the date in refData if date from hBase is earlier
.map { case (rowKey, (refDate, maybeRowDate)) => ( rowKey, chooseDate (refDate, maybeRowDate)) }
.collect
def chooseDate(refDate: String, rowDate: Option[String]) = rowDate match {
// if row not found in Hbase: keep ref date
case None => refDate
case Some(rDate) =>
if (true) /* replace this by first parsing the date, then check if rowDate < refDate */
rowDate
else
refDate
}
关于scala - Spark RDD 按键查找,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30421484/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!