gpt4 book ai didi

mongodb - Spark - 如何在 map() 中创建新的 RDD? (执行者的 SparkContext 为空)

转载 作者:可可西里 更新时间:2023-11-01 10:02:47 24 4
gpt4 key购买 nike

我有以下应用程序,它通过 MongoDB Spark 连接器使用到 MongoDB 的连接。我的代码崩溃是因为执行程序的 SparkContext 为空。基本上我从 MongoDB 读取数据,处理这些数据,这会导致需要发送到 MongoDB 的额外查询。最后一步是保存这些额外查询的数据。我使用的代码:

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);
JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(...);
JavaPairRDD<String, Document> pairRdd = aggregatedRdd
.mapToPair((document) -> new Tuple2(document.get("_id"), document));
JavaPairRDD<String, List<Document>> mergedRdd = pairRdd.aggregateByKey(new LinkedList<Document>(),
combineFunction, mergeFunction);

JavaRDD<Tuple2<String, List<Tuple2<Date, Date>>>> dateRdd = mergedRdd.map(...);

//at this point dateRdd contains key/value pairs of:
//Key: a MongoDB document ID (String)
//Value: List of Tuple<Date, Date> which are date ranges (start time and end time).

//For each of that date ranges I want to retrieve the data out of MongoDB
//and, for now, I just want to save that data

dateRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, List<Tuple2<Date, Date>>>>>() {
@Override
public void call(Iterator<Tuple2<String, List<Tuple2<Date, Date>>>> partitionIterator) throws Exception {
for (; partitionIterator.hasNext(); ) {
Tuple2<String, List<Tuple2<Date, Date>>> tuple = partitionIterator.next();
String fileName = tuple._1;
List<Tuple2<Date, Date>> dateRanges = tuple._2;

for (Tuple2<Date, Date> dateRange : dateRanges) {
Date startDate = dateRange._1;
Date endDate = dateRange._2;

Document aggregationDoc = Document.parse("{ $match: { ts: {$lt: new Date(" + startDate.getTime()
+ "), $gt: new Date(" + endDate.getTime() + ")}, root_document: \"" + fileName
+ "\", signals: { $elemMatch: { signal: \"SomeValue\" } } } }");


//this call will use the initial MongoSpark rdd with the aggregation pipeline that just got created.
//this will get sent to MongoDB
JavaMongoRDD<Document> filteredSignalRdd = rdd.withPipeline(Arrays.asList(aggregationDoc));

String outputFileName = String.format("output_data_%s_%d-%d", fileName,
startDate.getTime(), endDate.getTime());
filteredSignalRdd.saveAsTextFile(outputFileName);
}
}
}
});

我得到的异常是:

Job aborted due to stage failure: Task 23 in stage 2.0 failed 4 times, most recent failure: Lost task 23.3 in stage 2.0 (TID 501, hadoopb24): java.lang.IllegalArgumentException: requirement failed: RDD transformation requires a non-null SparkContext.
Unfortunately SparkContext in this MongoRDD is null.
This can happen after MongoRDD has been deserialized.
SparkContext is not Serializable, therefore it deserializes to null.
RDD transformations are not allowed inside lambdas used in other RDD transformations.
at scala.Predef$.require(Predef.scala:233)
at com.mongodb.spark.rdd.MongoRDD.checkSparkContext(MongoRDD.scala:170)
at com.mongodb.spark.rdd.MongoRDD.copy(MongoRDD.scala:126)
at com.mongodb.spark.rdd.MongoRDD.withPipeline(MongoRDD.scala:116)
at com.mongodb.spark.rdd.api.java.JavaMongoRDD.withPipeline(JavaMongoRDD.scala:46)

下图说明了我对我的应用程序的期望: enter image description here

这里的问题是什么,我怎样才能实现这种“嵌套”、异步创建新的 RDD?

如何访问执行程序中的 MongoSpark“上下文”? MongoSpark 库需要访问 SparkContext,这在执行程序中不可用。

我是否需要再次将所有数据带给驱动程序,然后让驱动程序向 MongoSpark“上下文”发出新的调用?我可以看到这可能是如何工作的,但这需要异步完成,即每当分区完成处理数据并具有 <String, Tuple<Date,Date>> 时。准备好,将其推送给驱动程序,让他开始新的查询。如何才能做到这一点?

最佳答案

这是预料之中的,不会改变。 Spark 不支持:

  • 嵌套 RDD。
  • 嵌套转换。
  • 嵌套操作。
  • 从操作/转换访问上下文或 session 。

在这种情况下,您可能可以使用标准的 Mongo 客户端。

关于mongodb - Spark - 如何在 map() 中创建新的 RDD? (执行者的 SparkContext 为空),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40550582/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com