e ) -6ren">
gpt4 book ai didi

scala - 按 RDD 值从 Cassandra 表中过滤

转载 作者:行者123 更新时间:2023-12-01 09:55:45 27 4
gpt4 key购买 nike

我想根据我在 RDD 中的值从 Cassandra 查询一些数据。我的方法如下:

val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e ) 
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name")
val userNames = userIds.flatMap { userId =>
t.where("userid = ?", userId).take(1)
}
userNames.take(1)

虽然 Cassandra 查询在 Spark shell 中工作,但当我在 flatMap 中使用它时它会抛出异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException: 
org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94)

我的理解是我不能在另一个 RDD 中生成一个 RDD(Cassandra 结果)。

我在网上找到的例子在一个RDD中读取整个Cassandra表并加入RDDs(像这样:https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/)。但如果 Cassandra 表很大,它就无法扩展。

但是我该如何解决这个问题呢?

最佳答案

Spark 1.2 或更高版本

Spark 1.2 引入了joinWithCassandraTable

val userids = sc.textFile("file:///Users/russellspitzer/users.list")
userids
.map(Tuple1(_))
.joinWithCassandraTable("keyspace","table")

此代码最终将完成与下面的解决方案相同的工作做。 joinWithCassandraTable 方法将使用与saveToCassandra 用于将类转换为 Cassandra 可以处理的内容理解。这就是为什么我们需要一个元组而不仅仅是一个简单的字符串来执行连接。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md#using-joinwithcassandratable


我认为您在这里真正想做的是对两个数据源进行内部联接。这实际上应该比平面图方法更快,并且有一些内部智能哈希。

scala> val userids = sc.textFile("file:///Users/russellspitzer/users.list")
scala> userids.take(5)
res19: Array[String] = Array(3, 2)

scala> sc.cassandraTable("test","users").collect
res20: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{userid: 3, username: Jacek}, CassandraRow{userid: 1, username: Russ}, CassandraRow{userid: 2, username: Helena})

scala> userids.map(line => (line.toInt,true)).join(sc.cassandraTable("test","users").map(row => (row.getInt("userid"),row.getString("username")))).collect
res18: Array[(Int, (Boolean, String))] = Array((2,(true,Helena)), (3,(true,Jacek)))

如果您实际上只想对您的 C* 数据库执行一系列主键查询,您最好只使用正常的驱动程序路径而不是使用 spark 来执行它们。

与直接驱动程序调用集成的 Spark 解决方案

import com.datastax.spark.connector.cql.CassandraConnector
import collection.JavaConversions._

val cc = CassandraConnector(sc.getConf)
val select = s"SELECT * FROM cctest.users where userid=?"
val ids = sc.parallelize(1 to 10)
ids.flatMap(id =>
cc.withSessionDo(session =>
session.execute(select, id.toInt: java.lang.Integer).iterator.toList.map(row =>
(row.getInt("userid"), row.getString("username"))))).collect

关于scala - 按 RDD 值从 Cassandra 表中过滤,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28121934/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com