gpt4 book ai didi

Java Spark groupByKey 与 key1 并在 groupedRDD 上使用 key2 执行aggregateByKey

转载 作者:行者123 更新时间:2023-12-02 03:11:26 27 4
gpt4 key购买 nike

我正在尝试做一个简单的 java Spark 应用程序,它执行以下操作

输入数据csv格式:key1,key2,data1,data2

基本上我在这里想做的是,

首先,我通过 key1 映射每一行,然后对该 rdd 执行 groupByKey 操作。

JavaRDD<String> viewRdd = sc.textFile("testfile.csv", 1);
JavaPairRDD<String, String> customerIdToRecordRDD = viewRdd
.mapToPair(w -> new Tuple2<String, String>(w.split(",")[0], w));
JavaPairRDD<String, Iterable<String>> groupedByKey1RDD = customerIdToRecordRDD.groupByKey();
System.out.println(customerIdToRecordGropedRDD.count());

现在我的问题是,我需要对 groupedByKey1RDD 中的每个组使用 key2 进行聚合。有什么方法可以将 Iterable 转换为 RDD 吗?或者我在这里遗漏了什么。我对此很陌生,任何帮助都会

示例输入和预期输出:

id_1,time0,10,10

id_2,time1,0,10

id_1,time1,11,10

id_1,time0,1,10

id_2,time1,10,10

输出按第一列分组,然后按第二列聚合(聚合逻辑是简单地将第 3 列和第 4 列相加):

id_1 : time0 : { sum1 : 11, sum2 : 20} ,
time1 : { sum1 : 11, sum2 : 10}

id_2 : time1 : { sum1 : 10, sum2 : 20}

最佳答案

这是使用 Spark 2.0 和 Dataframe 的解决方案。如果您仍想使用 RDD,请告诉我。

public class SparkGroupBySample {
public static void main(String[] args) {
//SparkSession
SparkSession spark = SparkSession
.builder()
.appName("SparkGroupBySample")
.master("local")
.getOrCreate();
//Schema
StructType schema = new StructType(new StructField[] {
new StructField("key1", DataTypes.StringType, true, Metadata.empty()),
new StructField("key2", DataTypes.StringType, true, Metadata.empty()),
new StructField("data1", DataTypes.IntegerType, true, Metadata.empty()),
new StructField("data2", DataTypes.IntegerType, true, Metadata.empty())});
//Read csv
Dataset<Row> dataSet = spark.read().format("csv").schema(schema).option("header", "true").option("delimiter", ",").load("c:\\temp\\sample.csv");
dataSet.show();
//groupBy and aggregate
Dataset<Row> dataSet1 = dataSet.groupBy("key1","key2").sum("data1","data2").toDF("key1","key2","sum1","sum2");
dataSet1.show();
//stop
spark.stop();
}
}

这是输出。

+----+-----+----+----+
|key1| key2|sum1|sum2|
+----+-----+----+----+
|id_1|time1| 11| 10|
|id_2|time1| 10| 20|
|id_1|time0| 11| 20|
+----+-----+----+----+

关于Java Spark groupByKey 与 key1 并在 groupedRDD 上使用 key2 执行aggregateByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40996133/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com