- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我是 spark 的新手,我正在尝试创建简单的 JavaDStream 以使用 spark-testing-base API 测试我的工作。到目前为止我所做的是:
JavaStreamingContext streamingContext = new
JavaStreamingContext(jsc(),Durations.seconds(10));
List<String> list = new LinkedList<String>();
list.add("first");
list.add("second");
list.add("third");
JavaRDD<String> myVeryOwnRDD = jsc().parallelize(list);
Queue<JavaRDD<String>> queue = new LinkedList<JavaRDD<String>>();
queue.add( myVeryOwnRDD );
JavaDStream<String> javaDStream = streamingContext.queueStream( queue );
javaDStream.foreachRDD( x-> {
x.collect().stream().forEach(n-> System.out.println("item of list: "+n));
});
我希望它能打印我的列表……但没有。我得到了它:
12:19:05.454 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function1> (org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3) +++
12:19:05.468 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared fields: 3
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - public static final long org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.serialVersionUID
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final org.apache.spark.streaming.api.java.JavaDStreamLike org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.$outer
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final org.apache.spark.api.java.function.VoidFunction org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.foreachFunc$3
12:19:05.469 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared methods: 2
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final java.lang.Object org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(java.lang.Object)
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final void org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(org.apache.spark.rdd.RDD)
12:19:05.470 [main] DEBUG org.apache.spark.util.ClosureCleaner - + inner classes: 0
12:19:05.471 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer classes: 1
12:19:05.472 [main] DEBUG org.apache.spark.util.ClosureCleaner - org.apache.spark.streaming.api.java.JavaDStreamLike
12:19:05.472 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer objects: 1
12:19:05.473 [main] DEBUG org.apache.spark.util.ClosureCleaner - org.apache.spark.streaming.api.java.JavaDStream@7209ffb5
12:19:05.474 [main] DEBUG org.apache.spark.util.ClosureCleaner - + populating accessed fields because this is the starting closure
12:19:05.478 [main] DEBUG org.apache.spark.util.ClosureCleaner - + fields accessed by starting closure: 1
12:19:05.479 [main] DEBUG org.apache.spark.util.ClosureCleaner - (interface org.apache.spark.streaming.api.java.JavaDStreamLike,Set())
12:19:05.479 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outermost object is not a closure, so do not clone it: (interface org.apache.spark.streaming.api.java.JavaDStreamLike,org.apache.spark.streaming.api.java.JavaDStream@7209ffb5)
12:19:05.480 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ closure <function1> (org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3) is now cleaned +++
12:19:05.481 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ Cleaning closure <function2> (org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3) +++
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared fields: 2
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - public static final long org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.serialVersionUID
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - private final scala.Function1 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.cleanedF$1
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + declared methods: 2
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final java.lang.Object org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(java.lang.Object,java.lang.Object)
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - public final void org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(org.apache.spark.rdd.RDD,org.apache.spark.streaming.Time)
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + inner classes: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer classes: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + outer objects: 0
12:19:05.482 [main] DEBUG org.apache.spark.util.ClosureCleaner - + populating accessed fields because this is the starting closure
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner - + fields accessed by starting closure: 0
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner - + there are no enclosing objects!
12:19:05.483 [main] DEBUG org.apache.spark.util.ClosureCleaner - +++ closure <function2> (org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3) is now cleaned +++
我错过了什么吗?PS:给定的输出就在我的打印列表应该在的地方,我正在为我的工作使用 Spring 单元测试:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = config.class)
public class myTester extends SharedJavaSparkContext implements Serializable{
最佳答案
我想您需要启动流上下文。
streamingContext.start()
关于JavaDStream 在 lambda 中打印 RDD 到控制台,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37989007/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!