gpt4 book ai didi

apache-spark - 从 JDBC 源迁移数据时如何优化分区?

转载 作者:行者123 更新时间:2023-12-04 03:56:06 26 4
gpt4 key购买 nike

我正在尝试将数据从 PostgreSQL 表中的表移动到 HDFS 上的 Hive 表。为此,我想出了以下代码:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
.option("user", devUserName).option("password", devPassword)
.option("partitionColumn","cast_id")
.option("lowerBound", 1).option("upperBound", 100000)
.option("numPartitions",70).load()
val totalCols:List[String] = splitColumns ++ textList
val cdt = new ChangeDataTypes(totalCols, dataMapper)
hiveDataTypes = cdt.gpDetails()
val fc = prepareHiveTableSchema(hiveDataTypes, partition_columns)
val allColsOrdered = yearDF.columns.diff(partition_columns) ++ partition_columns
val allCols = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
val resultDF = yearDF.select(allCols:_*)
val stringColumns = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
val finalDF = stringColumns.foldLeft(resultDF) {
(tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
}
finalDF
}
val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
val dataDFPart = dataDF.repartition(30)
dataDFPart.createOrReplaceTempView("preparedDF")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

数据插入到基于 prtn_String_columns: source_system_name, period_year, period_num动态分区的hive表中

使用的 Spark 提交:
SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

执行程序日志中生成以下错误消息:
Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
at java.util.jar.JarFile.getManifest(JarFile.java:180)
at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)

我在日志中看到正在使用给定数量的分区正确执行读取,如下所示:
Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

以下是分阶段执行者的状态:
enter image description here

enter image description here

enter image description here

enter image description here

数据未正确分区。一个分区变小,而另一个分区变大。这里存在偏斜问题。
将数据插入 Hive 表时,作业在以下行失败: spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")但我知道这是由于数据倾斜问题而发生的。

我试图增加执行程序的数量,增加执行程序内存,驱动程序内存,尝试只保存为 csv 文件而不是将数据帧保存到 Hive 表中,但没有任何影响执行从给出异常:
java.lang.OutOfMemoryError: GC overhead limit exceeded

代码中有什么我需要更正的吗?谁能让我知道我该如何解决这个问题?

最佳答案

  • 根据输入数据量和集群资源确定您需要多少个分区。根据经验,除非绝对必要,否则最好将分区输入保持在 1GB 以下。并且严格小于块大小限制。

    您已经 previously stated您迁移在不同帖子(5 - 70)中使用的 1TB 数据值可能会降低以确保流程顺利。

    尝试使用不需要进一步的值 repartitioning .
  • 了解您的数据。

    分析数据集中可用的列,以确定是否有任何具有高基数和均匀分布的列要分布在所需数量的分区中。这些是导入过程的良好候选者。此外,您应该确定一个确切的值范围。

    具有不同中心性和偏度度量的聚合以及直方图和基本键计数是很好的探索工具。对于这部分,最好直接在数据库中分析数据,而不是将其提取到 Spark。

    根据 RDBMS,您可能可以使用 width_bucket (PostgreSQL, Oracle) 或等效函数以了解加载后数据如何在 Spark 中分布 partitionColumn , lowerBound , upperBound , numPartitons .

    s"""(SELECT width_bucket($partitionColum, $lowerBound, $upperBound, $numPartitons) AS bucket, COUNT(*)
    FROM t
    GROUP BY bucket) as tmp)"""
  • 如果没有满足上述条件的列,请考虑:
  • 创建一个自定义的并通过它公开。一个观点。多个独立列上的哈希通常是很好的候选者。请查阅您的数据库手册以确定此处可以使用的函数(Oracle 中的 DBMS_CRYPTO,PostgreSQL 中的 pgcrypto)*。
  • 使用一组独立的列,它们一起提供足够高的基数。

    或者,如果您要写入分区的 Hive 表,则应考虑包括 Hive 分区列。它可能会限制稍后生成的文件数量。
  • 准备分区参数
  • 如果在前面的步骤中选择或创建的列是数字( or date / timestamp in Spark >= 2.4 ),则直接将其提供为 partitionColumn并使用之前确定的范围值来填充 lowerBoundupperBound .

    如果边界值不反射(reflect)数据的属性( min(col) 用于 lowerBoundmax(col) 用于 upperBound ),它可能会导致显着的数据倾斜,因此请小心线程。在最坏的情况下,当边界不覆盖数据范围时,所有记录都将由一台机器获取,这比根本不分区好。
  • 如果在前面的步骤中选择的列是分类的或者是一组列,则生成完全覆盖数据的互斥谓词列表,其形式可以在 SQL 中使用。 where 子句。

    例如,如果您有一列 A值 { a1 , a2 , a3 } 和列B值 { b1 , b2 , b3 }:

    val predicates = for {
    a <- Seq("a1", "a2", "a3")
    b <- Seq("b1", "b2", "b3")
    } yield s"A = $a AND B = $b"

    仔细检查条件不重叠并且所有组合都被覆盖。如果不满足这些条件,您最终会分别得到重复或丢失的记录。

    将数据传递为 predicates论据 jdbc打电话。请注意,分区数将与谓词数完全相等。
  • 将数据库置于只读模式(任何正在进行的写入都可能导致数据不一致。如果可能,您应该在开始整个过程​​之前锁定数据库,但如果不可能,则在您的组织中)。
  • 如果分区数与所需的输出负载数据匹配而没有 repartition并直接转储到接收器,如果没有,您可以尝试按照与步骤 1 相同的规则重新分区。
  • 如果您仍然遇到任何问题,请确保您已正确配置 Spark 内存和 GC 选项。
  • 如果以上都不起作用:
  • 考虑使用 COPY TO 之类的工具将数据转储到网络/分发存储并直接从那里阅读。

    请注意,标准数据库实用程序通常需要符合 POSIX 的文件系统,因此 HDFS 通常不会这样做。

    这种方式的好处是不用担心列属性,也不需要将数据置于只读模式,保证一致性。
  • 使用专用的批量传输工具,如 Apache Sqoop,然后重新调整数据。


  • * 不要使用伪列 - Pseudocolumn in Spark JDBC .

    关于apache-spark - 从 JDBC 源迁移数据时如何优化分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52603131/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com