gpt4 book ai didi

java - 当我调用 rdd.join(rdd) 时发生了什么

转载 作者:行者123 更新时间:2023-11-30 06:21:49 29 4
gpt4 key购买 nike

我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构:

List<Tuple2<String, Tuple2<Integer, Integer>>> dat2 = new ArrayList<>();
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(1, 1)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(2, 5)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Alice", new Tuple2<Integer, Integer>(3, 78)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(1, 6)));
dat2.add(new Tuple2<String, Tuple2<Integer, Integer>>("Bob", new Tuple2<Integer, Integer>(2, 11)));
JavaRDD<Tuple2<String, Tuple2<Integer, Integer>>> y2 = sc.parallelize(dat2);

现在,每个人的数据都可以这样查看:(时间戳,值)。我希望知道每一行 +-1 时间戳中发生的值的数量。 (我知道这看起来像滑动窗口,但我想要事件级别的粒度)

y2.join(y2);
resultOfJoin.filter(t -> t._2()._1()._1() - t._2()._2()._1() <= 1 && t._2()._1()._1() - t._2()._2()._1() >= -1)

在这种情况下,我找到的最佳解决方案是将 RDD 与其自身连接起来,为每个人创建 k^2 行,其中 k 是与此人关联的行数。

现在,我确实知道这是一场彻底的灾难。我知道这会导致洗牌(而且洗牌很糟糕),但我无法提供更好的东西。

我有 3 个问题:

  1. 由于我在连接后立即进行过滤,是否会影响连接带来的压力(换句话说,是否会有任何优化)?
  2. 网络上传递的行数是多少? (我知道在最坏的情况下,结果 RDD 将有 n^2 行)在网络上发送的行是 #workersn (仅发送一份副本并在工作人员上复制)还是 #workersn ^2(为结果工作器上的每 2 行组合发送行)?
  3. 如果我愿意使用数据集,我可以使用过滤器加入。我了解数据集对计算图有额外的优化。如果我迁移到数据集,我应该期望获得多少改进(如果有)?

最佳答案

Since I filter right after the join, will it effect the stress caused by the join (in other words, will there be any optimizations)?

不,不会有任何优化。

What is the volume of rows passed on the network?

O(N)(具体来说,每条记录将被打乱两次,每个父记录一次)您通过键加入,因此每一项都进入一个,并且只有一个分区。

If I would of worked with Dataset I could join with filter. I understand Datasets have additional optimization for the computation graph. How much improvement, if any, should I expect if I transit to Datasets?

随机播放过程得到了更好的优化,但除此之外,您不能期望任何特定于案例的优化。

wish to know for every row the number of values happening in +-1 timestamp.

尝试窗口函数:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val w = Window.partitionBy("id").ordetBy("timestamp")

rdd.toDF("id", "data")
.select($"id", $"data._1" as "timestamp", $"data._2" as "value"))
.withColumn("lead", lead($"value", 1).over(w))
.withColumn("lag", lag($"value", 1).over(w))

关于java - 当我调用 rdd.join(rdd) 时发生了什么,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47988255/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com