- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
所以我之前有一些关于如何在 java maven 项目中使用 spark 查询 cassandra 的问题:Querying Data in Cassandra via Spark in a Java Maven Project
好吧,我的问题得到了回答并且有效,但是我遇到了一个问题(可能是一个问题)。我现在正在尝试使用 datastax java API。这是我的代码:
package com.angel.testspark.test2;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App
{
// firstly, we define a bean class
public static class Person implements Serializable {
private Integer id;
private String fname;
private String lname;
private String role;
// Remember to declare no-args constructor
public Person() { }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getfname() { return fname; }
public void setfname(String fname) { this.fname = fname; }
public String getlname() { return lname; }
public void setlname(String lname) { this.lname = lname; }
public String getrole() { return role; }
public void setrole(String role) { this.role = role; }
// other methods, constructors, etc.
}
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
}
private void createSchema(JavaSparkContext sc) {
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
}
public static void main( String[] args )
{
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
}
}
这是我的错误:
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
现在我确切地知道我的错误在哪里。它是 System.out.println("Data as Person beans:\n"+ StringUtils.join("\n", rdd.toArray()));
因为我需要将 rdd 转换为大批。但是,API 文档说我应该能够做到这一点……这是从文档中复制和粘贴的代码。为什么我不能将 RDD 序列化为数组?
我已经使用我在上面链接中包含的帖子中的插入内容将虚拟数据插入到我的 cassandra 中。
此外,我之前解决的一个错误是将所有 getter 和 setter 更改为小写。当我在其中使用大写字母时,会产生错误。为什么我不能在此处的 getter 和 setter 中使用大写字母?
谢谢,天使
最佳答案
将 public class App
更改为 public class App implements Serializable
应该可以修复错误。因为 java 内部类将保留对外部类的引用,所以您的 Function
对象将具有对 App
的引用。由于 Spark 需要序列化您的 Function
对象,因此它要求 App
也是可序列化的。
关于java - RDD 不可序列化 Cassandra/Spark 连接器 java API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25982263/
我正在尝试将抓取的 xml 输出写入 json。由于项目不可序列化,抓取失败。 从这个问题来看,它建议您需要构建一个管道,未提供的答案超出了问题 SO scrapy serializer 的范围。 所
有没有一种方法可以通过重载函数来区分参数是在编译时可评估还是仅在运行时可评估? 假设我有以下功能: std::string lookup(int x) { return table::va
我正在使用 MVVM 模式编写一个应用程序。我通过将 View 的 DataContext 属性设置为 ViewModel 的实例来向 View 提供数据。一般来说,我只是从那里使用 Binding
对于一个项目,我正在使用带有简单 python module 的传感器收集多个红外命令。 . 我收到如下字节字符串: commando1= b'7g4770CQfwCTVT9bQDAzVEBMagGR
我有一个计算方法,可以在用户使用 Cartridge 作为我的商店框架结账时计算税费。 税 = 税 * 小数(str(settings.SHOP_DEFAULT_TAX_RATE)) 计算工作正常。然
我正在用 pygame 制作一个绘图程序,我想在其中为用户提供一个选项来保存程序的确切状态,然后在稍后重新加载它。在这一点上,我保存了我的全局字典的副本,然后遍历, pickle 每个对象。 pyga
在 C++11 之前,我可以使用它来使类不可复制: private: MyClass(const MyClass&); MyClass& operator=(const MyClass&); 使用 C
大家好 :) 我在我的 VC++ 项目中使用 1.5.4-all (2014-10-22)(适用于 x86 平台的 Microsoft Visual C++ 编译器 18.00.21005.1)。 我
我有一个 python 文件:analysis.py: def svm_analyze_AHE(file_name): # obtain abp file testdata = pd.
这个问题已经有答案了: How to serialize SqlAlchemy result to JSON? (37 个回答) 已关闭 4 年前。 我正在编写小查询来从 mysql 获取数据数据库,
我是 Python 初学者,我在 JSON 方面遇到了一些问题。在我正在使用的教程中有两个函数: def read_json(filename): data = [] if os.pa
我目前正在开发一个针对 iPad 的基于 HTML5 Canvas/JavaScript 的小型绘图应用程序。它在 Safari 中运行。到目前为止,除了一件事之外,一切都进展顺利。 如果我旋转设备,
以下代码无法使用 Visual Studio 2013 编译: #include struct X { X() = default; X(const X&) = delete;
嗨,我制作了一个文本分类分类器,我在其中使用了它,它返回一个数组,我想返回 jsonresponse,但最后一行代码给我错误 'array(['cycling'], dtype =object) 不可
我使用 Flask 和 Flask-Login 进行用户身份验证。 Flask-Sqlalchemy 将这些模型存储在 sqlite 数据库中: ROLE_USER = 0 ROLE_ADMIN =
如果您尝试发送不可 JSON 序列化的对象(列表、字典、整数等以外的任何对象),您会收到以下错误消息: "errorMessage": "Object of type set is not JSON
我在尝试 move std::vector 时遇到崩溃其中 T显然是不可 move 的(没有定义 move 构造函数/赋值运算符,它包含内部指针) 但为什么 vector 的 move 函数要调用 T
我尝试在用户成功登录后将 token 返回给他们,但不断收到以下错误: 类型错误:“字节”类型的对象不可 JSON 序列化 我该如何解决这个问题?这是我到目前为止的代码: if user:
我是一名优秀的程序员,十分优秀!