gpt4 book ai didi

apache-spark - Spark 作业卡在从 Oracle DB 导入的最后阶段 - 数据不倾斜

转载 作者:行者123 更新时间:2023-12-04 05:10:51 26 4
gpt4 key购买 nike

我正在尝试使用 Apache Spark 2.3.1 从 Oracle DB 中提取数据并将其放入 AWS S3。这项工作一直运行良好,直到最后阶段并卡在那里。我不认为数据有偏差,因为每个阶段都有相同数量的记录。下面是我在 spark 中使用的查询。

url = "jdbc:oracle:thin:@IP:PORT/SID"
user = "user"
password = "password"
driver = "oracle.jdbc.driver.OracleDriver"
table = "table"
fetchSize = 1000
partitionColumn = "num_rows"

date1 = (datetime.today() - td(days=42)).date().strftime('%d-%b-%Y')
date2 = (datetime.today() - td(days=2)).date().strftime('%d-%b-%Y')

query = "(select min(rownum) as min, max(rownum) as max from "+table+" where date>='"+str(date1)+"' and date<='"+str(date2)+"') tmp1"
print(query)

DF = spark.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.load()

lower_bound, upper_bound = DF.first()
lower_bound = int(lower_bound)
upper_bound = int(upper_bound)
numPartitions = int(upper_bound/fetchSize)+1
print(lower_bound,upper_bound)
print(numPartitions)

query = "(select t1.*, ROWNUM as num_rows from (select * from " + table + " where date>='"+str(date1)+"' and date<='"+str(date2)+"') t1) tmp2"
print(query)

DF = spark.read.format("jdbc").option("url", url) \
.option("dbtable", query) \
.option("user", user) \
.option("password", password) \
.option("fetchSize",fetchSize) \
.option("numPartitions", numPartitions) \
.option("partitionColumn", partitionColumn) \
.option("lowerBound", lower_bound) \
.option("upperBound", upper_bound) \
.option("driver", driver) \
.load()

path = "s3://my_path"
DF.write.mode("overwrite").parquet(path)

该代码主要提取最近 42 天的数据并将其放入 S3 存储桶中。以下是写入语句之前的输出。代码运行于'10-Sep-2018'

(select min(rownum) as min, max(rownum) as max from table where date>='30-Jul-2018' and date<='08-Sep-2018') tmp1
(1, 2195427)
2196
(select t1.*, ROWNUM as num_rows from (select * from table where date>='30-Jul-2018' and date<='08-Sep-2018') t1) tmp2

如你所见,

  • 总记录数 = 2195427
  • 每个分区的记录数 = 1000
  • 分区数 = 2196

因此该作业有2196 个阶段,每个阶段拉取 1000 条记录。作业卡在 2191/2196,还有 5 个阶段要走。

硬件规范:

我正在使用 r4.xlarge 机器。我的集群是 1 Master,2 Slaves of r4.xlarge。以下是我的驱动程序和执行程序规范。

spark.driver.cores  8
spark.driver.memory 24g
spark.driver.memoryOverhead 3072M
spark.executor.cores 1
spark.executor.memory 3g
spark.executor.memoryOverhead 512M
spark.yarn.am.cores 1
spark.yarn.am.memory 3g
spark.yarn.am.memoryOverhead 512M

Spark Executor UI

第 1 到 2191 阶段在 1.3 小时内完成,但其余 5 个阶段卡住了三个多小时。

请在此处找到日志: https://github.com/rinazbelhaj/stackoverflow/blob/master/Spark_Log_10_Sept_2018

我无法找出此问题的根本原因。

最佳答案

我猜你有两种可能的情况:

1 - 与 Oracle DB 相关的问题

我正在处理一个非常相似的问题。但任务并没有被卡住,而是被 SQL Server 中断了。中断是由 connection reset 引起的,它是随机发生的。

为了避免这种情况,我在 JDBC 连接字符串上设置了一些参数。错误已停止,但任务永远不会结束。

  • 原创:

jdbc:sqlserver://host:port;database=db_name;

  • 修改:
db_url=jdbc:sqlserver://host:port;
database=db_name;
applicationIntent=readonly;
applicationName=app-name;
columnEncryptionSetting=Disabled;
disableStatementPooling=true;
encrypt=false;
integratedSecurity=false;
lastUpdateCount=true;
lockTimeout=-1;
loginTimeout=15;
multiSubnetFailover=false;
packetSize=8000;
queryTimeout=-1;
responseBuffering=adaptive;
selectMethod=direct;
sendStringParametersAsUnicode=true;
serverNameAsACE=false;
TransparentNetworkIPResolution=true;
trustServerCertificate=false;
trustStoreType=JKS;
sendTimeAsDatetime=true;
xopenStates=false;
authenticationScheme=nativeAuthentication;
authentication=NotSpecified;
socketTimeout=0;
fips=false;
enablePrepareOnFirstPreparedStatementCall=false;
serverPreparedStatementDiscardThreshold=10;
statementPoolingCacheSize=0;
jaasConfigurationName=SQLJDBCDriver;
sslProtocol=TLS;
cancelQueryTimeout=-1;
useBulkCopyForBatchInsert=false;

因此,我决定删除 JDBC 连接字符串上添加的参数,并开始在集群创建时传递 spark 配置。我将最大重试次数从 4(默认)更改为 50

  • spark.task.maxFailures=50

因此,连接问题仍然存在,但至少任务成功结束。

我建议您设置任何连接超时,因为它可能是无限的 - 通常设置为 0-1。检查 Oracle 的驱动程序文档并尝试更改默认行为。

2 - 与 S3 相关的问题

我们还遇到了与 S3 上的写入 操作相关的问题。我找不到确切的错误消息,但它类似于 An error occurred while calling o70.parquet

当我们解决上述问题时,写入速度太慢。

我们团队的一个人建议使用 HDFS 从数据库写入数据,然后将操作从 HDFS 复制到 S3。性能大幅提升。

  • 将 HDFS 设置为目标(可能需要增加主节点的磁盘大小)
destination = 'hdfs:///path-to-hdfs'

DF.write \
.mode("overwrite") \
.parquet(destination)
  • 执行从 HDFS 到 S3 的复制

引用:https://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

希望对你有帮助!我会报告我的任务的任何改进;)

关于apache-spark - Spark 作业卡在从 Oracle DB 导入的最后阶段 - 数据不倾斜,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52255976/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com