gpt4 book ai didi

node.js - 从 Node.js 服务器查询 Spark SQL

转载 作者:搜寻专家 更新时间:2023-10-31 23:52:49 25 4
gpt4 key购买 nike

我目前正在使用 npm 的 cassandra-driver从 Node.js 服务器查询我的 Cassandra 数据库。因为我希望能够编写更复杂的查询,所以我想使用 Spark SQL 而不是 CQL。有什么方法可以创建 RESTful API(或其他东西),以便我可以像目前使用 CQL 一样使用 Spark SQL?

换句话说,我希望能够将 Spark SQL 查询从我的 Node.js 服务器发送到另一台服务器并返回结果。

有什么办法吗?我一直在寻找这个问题的解决方案,但还没有找到任何东西。

编辑:我能够从 Spark shell 使用 Scala 和 Spark SQL 查询我的数据库,所以这个位可以正常工作。我只需要以某种方式连接 Spark 和我的 Node.js 服务器。

最佳答案

我有一个类似的问题,我通过使用 Spark-JobServer 解决了.

Spark-Jobserver (SJS) 的主要方法通常是创建一个扩展其 SparkSQLJob 的特殊作业,例如以下示例:

object ExecuteQuery extends SparkSQLJob {
override def validate(sqlContext: SQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(sqlContext: SQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = sqlContext.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}

但是,要使这种方法与 Cassandra 配合良好,您必须使用 spark-cassandra-connector然后,要加载数据,您将有两个选择:

1) 在调用这个 ExecuteQuery 之前通过 REST,您必须将要查询的完整数据从 Cassandra 传输到 Spark。为此,您可以做类似的事情(代码改编自 spark-cassandra-connector documentation ):

val df = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "words", "keyspace" -> "test"))
.load()

然后将其注册为表,以便 SparkSQL 能够访问它:

df.registerAsTempTable("myTable") // As a temporary table
df.write.saveAsTable("myTable") // As a persistent Hive Table

只有在那之后你才能使用 ExecuteQuerymyTable 查询.

2) 由于第一种选择在某些用例中可能效率低下,因此还有另一种选择。

spark-cassandra-connector 有一个特殊的 CassandraSQLContext可用于直接从 Spark 查询 C* 表。它可以像这样使用:

val cc = new CassandraSQLContext(sc)
val df = cc.sql("SELECT * FROM keyspace.table ...")

但是,要在 Spark-JobServer 中使用不同类型的上下文,您需要扩展 SparkContextFactory并在上下文创建时使用它(这可以通过对 /contexts 的 POST 请求来完成)。可以在 SJS Gitub 上看到特殊上下文工厂的示例。 .您还必须创建一个 SparkCassandraJob , 扩展 SparkJob (但这部分很easy)。

最后,ExecuteQuery工作必须适应使用新类。它会是这样的:

object ExecuteQuery extends SparkCassandraJob {
override def validate(cc: CassandraSQLContext, config: Config): SparkJobValidation = {
// Code to validate the parameters received in the request body
}
override def runJob(cc: CassandraSQLContext, jobConfig: Config): Any = {
// Assuming your request sent a { "query": "..." } in the body:
val df = cc.sql(config.getString("query"))
createResponseFromDataFrame(df) // You should implement this
}
}

在那之后,ExecuteQuery可以使用 POST 请求通过 REST 执行作业。


结论

这里我使用第一个选项,因为我需要 HiveContext 中可用的高级查询(例如窗口函数),这些在 CassandraSQLContext 中不可用。 .但是,如果您不需要这些类型的操作,我推荐第二种方法,即使它需要一些额外的编码来为 SJS 创建一个新的 ContextFactory。

关于node.js - 从 Node.js 服务器查询 Spark SQL,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37089670/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com