gpt4 book ai didi

java - 使用java Spark将数据集保存到cassandra

转载 作者:行者123 更新时间:2023-12-01 16:51:21 24 4
gpt4 key购买 nike

我正在尝试使用 java Spark 将数据集保存到 cassandra db。我可以使用下面的代码成功地将数据读入数据集

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

但是当我尝试编写数据集时,我收到IOException:无法加载或查找表,在键空间中找到类似的表

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

我正在sparksession中设置主机和端口问题是我可以在覆盖和追加模式下写入,但无法创建表

我正在使用的版本如下: Spark java 2.0Spark Cassandra 连接器 2.3

尝试了不同的 jar 版本但没有任何效果我还浏览了不同的堆栈溢出和 github 链接

非常感谢任何帮助。

最佳答案

Spark 中的write 操作没有自动为您创建表的模式 - 这有多种原因。其中之一是你需要为你的表定义一个主键,否则,如果你设置不正确的主键,你可能会覆盖数据。正因为如此,Spark Cassandra Connector provides a separate method to create a table based on your dataframe structure ,但您需要提供分区和集群键列的列表。在 Java 中,它将如下所示(完整代码为 here ):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
partitionSeqlist, clusteringSeqlist, connector);

然后就可以照常写入数据了:

dataset.write()
.format("org.apache.spark.sql.cassandra")
.options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
.save();

关于java - 使用java Spark将数据集保存到cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61681364/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com