gpt4 book ai didi

apache-spark - Apache Spark中的持久内存数据库

转载 作者:行者123 更新时间:2023-12-02 09:46:48 25 4
gpt4 key购买 nike

我有一个用于Spark流的自定义foreach编写器。对于每一行,我都写入JDBC源。我还想在执行JDBC操作之前进行某种快速查找,并在执行JDBC操作之后更新值,例如下面的示例代码中的“Step-1”和“Step-3” ...

我不想使用REDIS,MongoDB等外部数据库。我想要低足迹的东西,例如RocksDB,Derby等。

我可以为每个应用程序存储一个文件,就像checkpointing一样,我将创建一个internal-db文件夹...

我看不到任何适用于Spark的内存数据库。

def main(args: Array[String]): Unit = {

val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();

val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;

val kafkaDataframe = sparkSession.readStream.format("kafka")
.options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
"startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
.load()

kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat ( topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")

val customForEachWriter = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = {
println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
true
}

override def process(value: Row) = {
// Step 1 ==> Lookup a key in persistent KEY-VALUE store

// JDBC operations

// Step 3 ==> Update the value in persistent KEY-VALUE store
}

override def close(errorOrNull: Throwable) = {
println(" ************** Closed ****************** ")
}
}

val yy = sqlDataframe
.writeStream
.queryName("foreachquery")
.foreach(customForEachWriter)
.start()

yy.awaitTermination()

sparkSession.close();

}

最佳答案

Manjesh,

您正在寻找的“将Spark和您的内存数据库作为一个无缝集群,共享一个进程空间”以及对MVCC的支持正是SnappyData提供的。使用SnappyData,要快速查找的表与运行Spark流作业的过程相同。看看here

SnappyData拥有核心产品的Apache V2许可证,并且OSS下载中提供了您所指的特定用途。

(公开:我是SnappyData的一名雇员,因此提供针对该问题的产品特定答案是有意义的,因为产品就是该问题的答案)

关于apache-spark - Apache Spark中的持久内存数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46245574/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com