gpt4 book ai didi

java - Spark中如何在不同的worker上运行任务?

转载 作者:太空宇宙 更新时间:2023-11-04 11:10:45 25 4
gpt4 key购买 nike

我有以下 Spark 代码:

package my.spark;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class ExecutionTest {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("ExecutionTest")
.getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

int slices = 2;
int n = slices;
List<String> list = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
list.add("" + i);
}

JavaRDD<String> dataSet = jsc.parallelize(list, slices);

dataSet.foreach(str -> {
System.out.println("value: " + str);
Thread.sleep(10000);
});

System.out.println("done");

spark.stop();
}

}

我已经使用以下命令运行主节点和两个工作节点(本地主机上的所有内容;Windows):

bin\spark-class org.apache.spark.deploy.master.Master

和(两次):

bin\spark-class org.apache.spark.deploy.worker.Worker spark://<local-ip>:7077

一切都正确开始。

使用命令提交作业后:

bin\spark-submit --class my.spark.ExecutionTest --master spark://<local-ip>:7077 file:///<pathToFatJar>/FatJar.jar

命令已启动,但 value: 0value: 1 输出由其中一名工作人员写入(如与工作人员关联的页面上的Logs > stdout 上所示)。第二个工作人员在 Logs > stdout 中没有任何内容。据我了解,这意味着每次迭代都是由同一个工作人员完成的。

如何在两个不同的正在运行的工作线程上运行这些任务?

最佳答案

这是可能的,但我不确定它是否每次都可以正常工作。然而,在测试时,每次都按预期工作。

我已经使用 Windows 10 x64 主机和 4 个虚拟机 (VM) 测试了我的代码:具有 Debian 9(延伸)内核 4.9.0 x64 的 VirtualBox、仅主机网络、Java 1.8.0_144、适用于 Hadoop 2.7 的 Apache Spark 2.2.0 (spark-2.2.0-bin-hadoop2.7.ta​​r.gz)。

我一直在虚拟机上使用主服务器和 3 个从服务器,在 Windows 上使用另外一个从服务器:

  • debian-master - 1 个 CPU,1 GB RAM
  • debian-slave1 - 1 个 CPU,1 GB RAM
  • debian-slave2 - 1 个 CPU,1 GB RAM
  • debian-slave3 - 2 个 CPU,1 GB RAM
  • windows-slave - 4 CPU,8 GB RAM

我正在将作业从 Windows 计算机提交到位于虚拟机上的主服务器。

开头和之前一样:

    SparkSession spark = SparkSession
.builder()
.config("spark.cores.max", coresCount) // not necessary
.appName("ExecutionTest")
.getOrCreate();

[重要] coresCount 对于分区至关重要 - 我必须使用已用核心的数量来对数据进行分区,而不是工作线程/执行程序的数量。

接下来,我必须创建 JavaSparkContext 和 RDD。重用 RDD 允许多次执行同一组工作线程。

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

JavaRDD<Integer> rddList
= jsc.parallelize(
IntStream.range(0, coresCount * 2)
.boxed().collect(Collectors.toList()))
.repartition(coresCount);

我创建了包含 coresCount * 2 元素的 rddList 。等于 coresCount 的元素数量不允许在所有关联的工作线程上运行(在我的例子中)。也许,coresCount + 1 就足够了,但我还没有测试它,因为 coresCount * 2 也不够。

接下来要做的是运行命令:

    List<String> hostsList
= rddList.map(value -> {
Thread.sleep(3_000);
return InetAddress.getLocalHost().getHostAddress();
})
.distinct()
.collect();

System.out.println("-----> hostsList = " + hostsList);

Thread.sleep(3_000) 对于正确分配任务是必要的。 3秒对我来说就足够了。可能该值可能更小,有时可能需要更高的值(我猜该值取决于工作人员从主机获取任务执行的速度)。

上述代码将在与工作线程关联的每个核心上运行,因此每个工作线程不止一个。为了在每个工作线程上运行一个命令,我使用了以下代码:

/* as static field of class */
private static final AtomicBoolean ONE_ON_WORKER = new AtomicBoolean(false);

...

long nodeCount
= rddList.map(value -> {
Thread.sleep(3_000);
if (ONE_ON_WORKER.getAndSet(true) == false) {
System.out.println("Executed on "
+ InetAddress.getLocalHost().getHostName());
return 1;
} else {
return 0;
}
})
.filter(val -> val != 0)
.count();

System.out.println("-----> finished using #nodes = " + nodeCount);

当然,最后,停止:

    spark.stop();

关于java - Spark中如何在不同的worker上运行任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46031549/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com