gpt4 book ai didi

java - 合并多个 JavaRDD

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:04:57 25 4
gpt4 key购买 nike

我试图合并多个 JavaRDD,但我只合并了 2 个,有人可以帮忙吗?我已经为此苦苦挣扎了一段时间,但总的来说,我希望能够获得多个集合并使用 sqlContext 创建一个组并打印出所有结果。

这里是我的代码

  JavaRDD<AppLog> logs =  mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppa_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.fav_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.pps_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.dd_logs").union(
mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppt_logs")
)
)
)
);


public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){

Configuration mongodbConfig = new Configuration();
mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
mongodbConfig.set("mongo.input.uri", uri);

JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
mongodbConfig, // Configuration
MongoInputFormat.class, // InputFormat: read from a live cluster.
Object.class, // Key class
BSONObject.class // Value class
);

return documents.map(

new Function<Tuple2<Object, BSONObject>, AppLog>() {

public AppLog call(final Tuple2<Object, BSONObject> tuple) {
AppLog log = new AppLog();
BSONObject header =
(BSONObject) tuple._2();

log.setTarget((String) header.get("target"));
log.setAction((String) header.get("action"));

return log;
}
}
);
}

//打印集合 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);
logsSchema.registerTempTable("logs");

DataFrame groupedMessages = sqlContext.sql(
"select * from logs");
// "select target, action, Count(*) from logs group by target, action");

// "SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\"");



groupedMessages.show();

logsSchema.printSchema();

最佳答案

如果你想合并多个 JavaRDDs ,只需使用 sc.union(rdd1,rdd2,..) 代替 rdd1.union(rdd2).union (rdd3)

同时检查这个 RDD.union vs SparkContex.union

关于java - 合并多个 JavaRDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40024690/

25 4 0