gpt4 book ai didi

apache-spark - 如何在 Spark 中按分区对键/值进行分组?

转载 作者:行者123 更新时间:2023-12-04 04:16:48 28 4
gpt4 key购买 nike

我有一个 Spark Streaming 应用程序,它每秒接收多个 JSON 消息,每个消息都有一个标识其来源的 ID。

使用此 ID 作为 key ,我可以执行 MapPartitionsToPair ,从而创建一个 JavaPairDStream,具有键/值对的 RDD,每个分区一个键值对(因此,例如,如果我收到 5 条 JSON 消息,我会得到一个带有 5 个分区的 RDD,每个分区都将消息的 ID 作为键,以及 JSON 消息本身作为值)。

我现在想要做的是,我想将所有具有相同键的值分组到同一个分区中。因此,例如,如果我有 3 个分区的键为“a”和 2 个分区的键为“b”,我想创建一个带有 2 个分区而不是 5 个分区的新 RDD,每个分区包含一个键具有的所有值,一个用于'a' 和 'b' 一个。

我怎样才能做到这一点?
到目前为止,这是我的代码:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {

ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();

while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;

}

JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);

System.out.print(b._1+"_"+b._2);
// }
//break;
}


return a;
}
});

//我创建了一个 JavaPairDStream,其中每个分区都包含一个键/值对。

我尝试使用 grouByKey() ,但无论消息数是多少,我的分区号始终为 2。

我该怎么做?
非常感谢。

最佳答案

您可以使用

groupByKey(Integer numPartitions)

并设置 numPartitions等于您拥有的不同键的数量。

但是..你需要知道 你有多少个不同的键前面。你有那个信息吗?可能不是。那么..你需要做一些额外的(/冗余)工作。例如。用
countByKey

作为第一步。这比 groupByKey 快 - 所以至少你没有将总处理时间加倍。

更新 OP 询问为什么他们默认获得 2 个分区。

默认 groupByKey使用 defaultPartitioner()方法
groupByKey(defaultPartitioner(self))
  • 选择 Partitioner来自具有最大基数的父分区。

  • -- 否则会使用 spark.default.parallelism

    关于apache-spark - 如何在 Spark 中按分区对键/值进行分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37908890/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com