gpt4 book ai didi

java - 在 Java 中从 Spark 运行 Cassandra 时出错 - NoClassDefFoundError at org.apache.spark.sql.catalyst

转载 作者:搜寻专家 更新时间:2023-11-01 02:05:27 24 4
gpt4 key购买 nike

我正在使用 Cassandra 3.0.3、Spark 1.6.0 并尝试通过结合 http://www.datastax.com/dev/blog/accessing-cassandra-from-spark-in-java 中旧文档中的代码来运行和新的 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md .

这是我的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>muhrafifm</groupId>
<artifactId>spark-cass-twitterdw</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.6.0-M1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.6.0-M1</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>

我所做的更改基本上是在方法javaFunction 中,这里是我根据新文档更改javaFunction 后的方法之一。我还包含了 import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());

// Prepare the schema
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}

// Prepare the products hierarchy
List<Product> products = Arrays.asList(
new Product(0, "All products", Collections.<Integer>emptyList()),
new Product(1, "Product A", Arrays.asList(0)),
new Product(4, "Product A1", Arrays.asList(0, 1)),
new Product(5, "Product A2", Arrays.asList(0, 1)),
new Product(2, "Product B", Arrays.asList(0)),
new Product(6, "Product B1", Arrays.asList(0, 2)),
new Product(7, "Product B2", Arrays.asList(0, 2)),
new Product(3, "Product C", Arrays.asList(0)),
new Product(8, "Product C1", Arrays.asList(0, 3)),
new Product(9, "Product C2", Arrays.asList(0, 3))
);

JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD).writerBuilder("java_api", "products", mapToRow(Product.class)).saveToCassandra();

JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
@Override
public Boolean call(Product product) throws Exception {
return product.getParents().size() == 2;
}
}).flatMap(new FlatMapFunction<Product, Sale>() {
@Override
public Iterable<Sale> call(Product product) throws Exception {
Random random = new Random();
List<Sale> sales = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
}
return sales;
}
});
javaFunctions(salesRDD).writerBuilder("java_api", "sales", mapToRow(Sale.class)).saveToCassandra();
}

这是我得到的错误。

16/03/04 13:29:06 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/03/04 13:29:06 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/package$ScalaReflectionLock$
at org.apache.spark.sql.catalyst.ReflectionLock$.<init>(ReflectionLock.scala:5)
at org.apache.spark.sql.catalyst.ReflectionLock$.<clinit>(ReflectionLock.scala)
at com.datastax.spark.connector.mapper.ReflectionColumnMapper.<init>(ReflectionColumnMapper.scala:38)
at com.datastax.spark.connector.mapper.JavaBeanColumnMapper.<init>(JavaBeanColumnMapper.scala:10)
at com.datastax.spark.connector.util.JavaApiHelper$.javaBeanColumnMapper(JavaApiHelper.scala:93)
at com.datastax.spark.connector.util.JavaApiHelper.javaBeanColumnMapper(JavaApiHelper.scala)
at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1204)
at com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow(CassandraJavaUtil.java:1222)
at muhrafifm.spark.cass.twitterdw.Demo.generateData(Demo.java:69)
at muhrafifm.spark.cass.twitterdw.Demo.run(Demo.java:35)
at muhrafifm.spark.cass.twitterdw.Demo.main(Demo.java:181)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.catalyst.package$ScalaReflectionLock$
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
16/03/04 13:29:40 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
16/03/04 13:29:41 INFO SparkContext: Invoking stop() from shutdown hook
16/03/04 13:29:41 INFO SparkUI: Stopped Spark web UI at http://10.144.233.28:4040
16/03/04 13:29:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/03/04 13:29:42 INFO MemoryStore: MemoryStore cleared
16/03/04 13:29:42 INFO BlockManager: BlockManager stopped
16/03/04 13:29:42 INFO BlockManagerMaster: BlockManagerMaster stopped
16/03/04 13:29:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/03/04 13:29:42 INFO SparkContext: Successfully stopped SparkContext
16/03/04 13:29:42 INFO ShutdownHookManager: Shutdown hook called
16/03/04 13:29:42 INFO ShutdownHookManager: Deleting directory /tmp/spark- 16fd2ae2-b61b-4411-a776-1e578caabba6
------------------------------------------------------------------------
BUILD FAILURE

我做错了什么吗?似乎需要我什至不使用的包,有什么可以解决的吗?还是应该使用以前版本的 cassandra-spark-connector?

感谢任何回复,谢谢。

最佳答案

代码正在寻找

org/apache/spark/sql/catalyst/package$ScalaReflectionLock$

所以你应该包括 spark-sql 库,它具有正确的依赖关系。

关于java - 在 Java 中从 Spark 运行 Cassandra 时出错 - NoClassDefFoundError at org.apache.spark.sql.catalyst,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35789816/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com