gpt4 book ai didi

apache-spark - 将流式数据集附加到 Spark 中的批处理数据集

转载 作者:行者123 更新时间:2023-12-02 12:10:20 26 4
gpt4 key购买 nike

我们在 Spark 中有一个用例,我们希望将历史数据从数据库加载到 Spark,并不断向 Spark 添加新的流数据,然后我们可以对整个最新数据集进行分析。

据我所知,Spark SQL和Spark Streaming都无法将历史数据与流数据结合起来。然后我发现Spark 2.0中的Structured Streaming似乎是为了这个问题而构建的。但经过一番实验,我仍然无法弄清楚。这是我的代码:

SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);


// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
.format("socket")
.option("host", "127.0.0.1")
.option("port", 11111)
.load();

// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
.as(Encoders.STRING())
.flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
@Override
public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
...
}
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data

ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();

我收到错误“org.apache.spark.sql.AnalysisException:不支持流式处理和批处理 DataFrames/数据集之间的联合;”当我联合()两个数据集时。

有人可以帮我吗?我是不是走错方向了?

最佳答案

我无法代表 MongoDB Spark 连接器支持此类功能,并且 Google 上似乎没有太多相关信息。然而,Spark 数据库生态系统中还有其他数据库可以做到这一点。我在 another answer 中介绍了 Spark 数据库生态系统中的大部分内容。 。尽管我知道SnappyData,但我无法准确地说哪个数据库可以轻松地支持您正在寻找的功能类型。和 MemSQL都在那个名单里。但是,您可能需要两者的关系形式的数据。

关于apache-spark - 将流式数据集附加到 Spark 中的批处理数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39823394/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com