- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我有以下应用程序,它通过 MongoDB Spark 连接器使用到 MongoDB 的连接。我的代码崩溃是因为执行程序的 SparkContext 为空。基本上我从 MongoDB 读取数据,处理这些数据,这会导致需要发送到 MongoDB 的额外查询。最后一步是保存这些额外查询的数据。我使用的代码:
JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(...);
JavaPairRDD<String, Document> pairRdd = aggregatedRdd
.mapToPair((document) -> new Tuple2(document.get("_id"), document));
JavaPairRDD<String, List<Document>> mergedRdd = pairRdd.aggregateByKey(new LinkedList<Document>(),
combineFunction, mergeFunction);
JavaRDD<Tuple2<String, List<Tuple2<Date, Date>>>> dateRdd = mergedRdd.map(...);
//at this point dateRdd contains key/value pairs of:
//Key: a MongoDB document ID (String)
//Value: List of Tuple<Date, Date> which are date ranges (start time and end time).
//For each of that date ranges I want to retrieve the data out of MongoDB
//and, for now, I just want to save that data
dateRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, List<Tuple2<Date, Date>>>>>() {
@Override
public void call(Iterator<Tuple2<String, List<Tuple2<Date, Date>>>> partitionIterator) throws Exception {
for (; partitionIterator.hasNext(); ) {
Tuple2<String, List<Tuple2<Date, Date>>> tuple = partitionIterator.next();
String fileName = tuple._1;
List<Tuple2<Date, Date>> dateRanges = tuple._2;
for (Tuple2<Date, Date> dateRange : dateRanges) {
Date startDate = dateRange._1;
Date endDate = dateRange._2;
Document aggregationDoc = Document.parse("{ $match: { ts: {$lt: new Date(" + startDate.getTime()
+ "), $gt: new Date(" + endDate.getTime() + ")}, root_document: \"" + fileName
+ "\", signals: { $elemMatch: { signal: \"SomeValue\" } } } }");
//this call will use the initial MongoSpark rdd with the aggregation pipeline that just got created.
//this will get sent to MongoDB
JavaMongoRDD<Document> filteredSignalRdd = rdd.withPipeline(Arrays.asList(aggregationDoc));
String outputFileName = String.format("output_data_%s_%d-%d", fileName,
startDate.getTime(), endDate.getTime());
filteredSignalRdd.saveAsTextFile(outputFileName);
}
}
}
});
我得到的异常是:
Job aborted due to stage failure: Task 23 in stage 2.0 failed 4 times, most recent failure: Lost task 23.3 in stage 2.0 (TID 501, hadoopb24): java.lang.IllegalArgumentException: requirement failed: RDD transformation requires a non-null SparkContext.
Unfortunately SparkContext in this MongoRDD is null.
This can happen after MongoRDD has been deserialized.
SparkContext is not Serializable, therefore it deserializes to null.
RDD transformations are not allowed inside lambdas used in other RDD transformations.
at scala.Predef$.require(Predef.scala:233)
at com.mongodb.spark.rdd.MongoRDD.checkSparkContext(MongoRDD.scala:170)
at com.mongodb.spark.rdd.MongoRDD.copy(MongoRDD.scala:126)
at com.mongodb.spark.rdd.MongoRDD.withPipeline(MongoRDD.scala:116)
at com.mongodb.spark.rdd.api.java.JavaMongoRDD.withPipeline(JavaMongoRDD.scala:46)
这里的问题是什么,我怎样才能实现这种“嵌套”、异步创建新的 RDD?
如何访问执行程序中的 MongoSpark“上下文”? MongoSpark 库需要访问 SparkContext,这在执行程序中不可用。
我是否需要再次将所有数据带给驱动程序,然后让驱动程序向 MongoSpark“上下文”发出新的调用?我可以看到这可能是如何工作的,但这需要异步完成,即每当分区完成处理数据并具有 <String, Tuple<Date,Date>>
时。准备好,将其推送给驱动程序,让他开始新的查询。如何才能做到这一点?
最佳答案
这是预料之中的,不会改变。 Spark 不支持:
在这种情况下,您可能可以使用标准的 Mongo 客户端。
关于mongodb - Spark - 如何在 map() 中创建新的 RDD? (执行者的 SparkContext 为空),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40550582/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!