gpt4 book ai didi

apache-spark - 从 Spark 执行器查询 cassandra

转载 作者:行者123 更新时间:2023-12-02 09:35:59 26 4
gpt4 key购买 nike

我有一个来自 kafka 的流应用程序,我想知道是否有办法从 map 函数内部进行范围查询?

我按时间范围和键对来自 kafka 的消息进行分组,然后根据这些时间范围和键将数据从 cassandra 提取到该数据流中。

类似于:

lookups
.map(lookup => ((lookup.key, lookup.startTime, lookup.endTime), lookup))
.groupByKey()
.transform(rdd => {
val cassandraSQLContext = new CassandraSQLContext(rdd.context)
rdd.map(lookupPair => {
val tableName = //variable based on lookup
val startTime = aggLookupPair._1._2
val endTime = aggLookupPair._1._3

cassandraSQLContext
.cassandraSql(s"SELECT * FROM ${CASSANDRA_KEYSPACE}.${tableName} WHERE key=${...} AND start_time >= ${startTime} AND start_time < ${endTime};")
.map(row => {
//match to {
case /*case 1*/ => new object1(row)
case /*case 2*/ =>new object2(row)
}
})
.collect()
})
})

这给了我一个空指针异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 1 times, most recent failure: Lost task 0.0 in stage 59.0 (TID 63, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
at org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:70)
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:130)
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:123)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

我也尝试过 ssc.cassandraTable(CASSANDRA_KEYSPACE, tableName).where("key = ?", ...)... 但在尝试访问 StreamingContext 内部时 Spark 崩溃一张 map 。

如果有人有任何建议,我将不胜感激。谢谢!

最佳答案

如果您的查询基于分区键,您可能需要使用joinWithCassandraTable

但是如果您需要更多灵 active

CassandraConnector(sc.getConf).withSessionDo( session => ...)

将允许您访问执行器上的 session 池并执行您想要的任何操作,而无需管理连接。代码都是可序列化的,并且可以放置在映射中。

关于apache-spark - 从 Spark 执行器查询 cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38383297/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com