gpt4 book ai didi

java - 无法使用 spark (java) 从 Cassandra 获取数据

转载 作者:行者123 更新时间:2023-11-29 04:35:06 27 4
gpt4 key购买 nike

我是 Cassandra 和 Spark 的新手,正在尝试使用 spark 从数据库中获取数据。为此,我正在使用 Java。问题是没有抛出异常或发生错误,但我仍然无法获取数据。在下面找到我的代码 -

    SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Spark-Cassandra Integration");
sparkConf.setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", "stagingHost22");
sparkConf.set("spark.cassandra.connection.port", "9042");

sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");


JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
String keySpaceName = "testKeySpace";
String tableName = "testTable";

CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);

final ArrayList dataList = new ArrayList();
JavaRDD<String> userRDD = cassandraRDD.map(new Function<CassandraRow, String>() {

private static final long serialVersionUID = -165799649937652815L;


public String call(CassandraRow row) throws Exception {
System.out.println("Inside RDD call");
dataList.add(row);
return "test";
}
});
System.out.println( "data Size -" + dataList.size());

Cassandra 和 spark maven 依赖项是 -

      <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.5.4</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.3.0</version>
</dependency>

这确保 stagingHost22 主机具有带有键空间 - testKeySpace 和表名 - testTable 的 cassandra 数据。查找以下查询输出 -

cqlsh:testKeySpace> select count(*) from testTable;

count

34

(1 rows)

有人可以建议我在这里缺少什么吗?

提前致谢。

热烈的问候,

维巴哈

最佳答案

您当前的代码不执行任何 Spark 操作。因此没有数据被加载。

查看 Spark 文档以了解 Spark 中转换和操作之间的区别: http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

此外,在使用 Cassandra 连接器时,通常不需要将 CassandraRows 添加到 ArrayList。我建议首先实现一个简单的选择(遵循 Spark-Cassandra-Connector 文档)。如果这有效,您可以根据需要扩展此代码。

检查以下示例链接,了解如何使用连接器加载数据:

关于java - 无法使用 spark (java) 从 Cassandra 获取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41936487/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com