gpt4 book ai didi

java - 如何在Java中并行运行spark程序

转载 作者:行者123 更新时间:2023-12-02 10:44:41 25 4
gpt4 key购买 nike

所以我有一个具有 Spark Maven 依赖项的 Java 应用程序,并且在运行它时,它会在运行它的主机上启动 Spark 服务器。服务器实例有 36 个核心。我指定了 SparkSession 实例,其中并行提及了核心数量和其他配置属性,但是当我使用 htop 查看统计信息时,它似乎并没有使用所有核心,而只是使用了 1 个核心。

   SparkSession spark  = SparkSession
.builder()
.master("local")
.appName("my-spark")
.config("spark.driver.memory","50g")
.config("spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.sql.shuffle.partitions", "400")
.config("spark.eventLog.enabled", "true")
.config("spark.eventLog.dir", "/dir1/dir2/logs")
.config("spark.history.fs.logDirectory", "/dir1/dir2/logs")
.config("spark.executor.cores", "36")

我还在 JavaSparkContext 中添加了:

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
sc.hadoopConfiguration().set("fs.s3a.access.key", AWS_KEY);
sc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_KEY);
sc.hadoopConfiguration().set("spark.driver.memory","50g");
sc.hadoopConfiguration().set("spark.eventLog.enabled", "true");
sc.hadoopConfiguration().set("spark.eventLog.dir", "/dir1/dir2/logs");
sc.hadoopConfiguration().set("spark.executor.cores", "36");

我的任务是将数据从 aws s3 读取到 df 并将数据写入另一个存储桶。

Dataset<Row> df = spark.read().format("csv").option("header", "true").load("s3a://bucket/file.csv.gz");
//df = df.repartition(200);

df.withColumn("col_name", df.col("col_name")).sort("col_name", "_id").write().format("iceberg").mode("append").save(location);

最佳答案

.gz 文件是“不可吐出的”:要解压缩它们,您必须从字节 0 开始并向前读取。因此,spark、hive、MapReduce 等将整个文件交给单个工作人员。如果您想要并行处理,请使用不同的压缩格式(例如 snappy)

关于java - 如何在Java中并行运行spark程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52674150/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com