gpt4 book ai didi

java - Spark - 为什么在打印 RDD 之前需要收集()到驱动程序节点?不能并行吗?

转载 作者:行者123 更新时间:2023-12-02 10:58:05 24 4
gpt4 key购买 nike

我正在阅读有关如何在 Spark 中打印 RDD(我使用的是 Java),似乎大多数人只是 collect() (如果 RDD 足够小)并使用 forall( println),或者类似的东西。不能并行打印吗?为什么我们必须将数据收集到驱动节点上才能打印?

我想也许是因为我们不能并行使用 System.out,但我觉得不是这样的。此外,我不太确定如何在代码方面分布式数据并并行打印。我想到的一种方法是创建一个映射分区,它在映射方面不做任何有用的事情,但它会迭代分区并打印其内容。

最佳答案

当您调用collect()方法时,您会将所有结果返回给驱动程序节点。您将拥有一个 List 而不是 RDD。让我们看一个本地模式的示例。假设你有一个 Integer 的 RDD:

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

如果调用foreach方法(Java中的stream().forEach()),驱动节点将打印RDD<中的所有元素 按照您创建它的顺序。

rdd.collect().stream().forEach(x -> System.out.println(x));

输出:1、2、3、4、5、6、7、8、9、10

如果你想在每个worker上打印结果,你必须调用RDDforeach方法。它不会向驱动程序返回任何内容,而只会执行您在每个工作节点上的 foreach 方法中指定的计算。

rdd.foreach(x -> System.out.println(x));

如果您看到控制台(本地模式),您会注意到 System.out.println(x) 已在单独的线程中执行,因为输出不尊重原始顺序:

输出:6、3、2、1、8、9、10、4、5、7

因此,如果以分布式模式执行它,每个执行器都会在其日志文件上打印 System.out.println 操作的结果。

您还提到了 mapPartitions 方法。就您而言,我认为它没有比直接在 RDD 上使用 foreach 更有用。这可能有助于控制 worker 的工作量。

 rdd.repartition(5).mapPartitions(x -> {
while(x.hasNext()){
Integer i = x.next();
System.out.println(i);
}
return x;
}).count(); // Count is just to force the execution of mapPartition (mapPartition is lazy and doesn't get executed until an action is called)

希望对你有帮助!

关于java - Spark - 为什么在打印 RDD 之前需要收集()到驱动程序节点?不能并行吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51563731/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com