gpt4 book ai didi

hadoop - S3NativeFileSystem 调用是否会在 AWS EMR 4.6.0 上杀死我的 Pyspark 应用程序

转载 作者:可可西里 更新时间:2023-11-01 14:46:45 24 4
gpt4 key购买 nike

当我的 Spark 应用程序必须从 S3 访问大量 CSV 文件(每个 ~1000 @ 63MB)并将它们通过管道传输到 Spark RDD 时,它失败了。拆分 CSV 的实际过程似乎可行,但对 S3NativeFileSystem 的额外函数调用似乎导致错误和作业崩溃。

首先,以下是我的 PySpark 应用程序:

from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
import time

startTime = float(time.time())

dataPath = 's3://PATHTODIRECTORY/'
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "MYKEY")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "MYSECRETKEY")

def buildSchemaDF(tableName, columnList):
currentRDD = sc.textFile(dataPath + tableName).map(lambda line: line.split("|"))
currentDF = currentRDD.toDF(columnList)
return currentDF

loadStartTime = float(time.time())
lineitemDF = buildSchemaDF('lineitem*', ['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment'])
lineitemDF.registerTempTable("lineitem")
loadTimeElapsed = float(time.time()) - loadStartTime

queryStartTime = float(time.time())

qstr = """
SELECT
lineitem.l_returnflag,
lineitem.l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_discount) as sum_disc,
sum(l_tax) as sum_tax,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(l_orderkey) as count_order
FROM
lineitem
WHERE
l_shipdate <= '19981001'
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus
"""
tpch1DF = sqlContext.sql(qstr)

queryTimeElapsed = float(time.time()) - queryStartTime
totalTimeElapsed = float(time.time()) - startTime

tpch1DF.show()

queryResults = [qstr, loadTimeElapsed, queryTimeElapsed, totalTimeElapsed]
distData = sc.parallelize(queryResults)
distData.saveAsTextFile(dataPath + 'queryResults.csv')

print 'Load Time: ' + str(loadTimeElapsed)
print 'Query Time: ' + str(queryTimeElapsed)
print 'Total Time: ' + str(totalTimeElapsed)

为了循序渐进,我首先使用以下 AWS CLI 命令启动一个 Spark EMR 集群(添加回车以提高可读性):

aws emr create-cluster --name "Big TPCH Spark cluster2" --release-label emr-4.6.0 
--applications Name=Spark --ec2-attributes KeyName=blazing-test-aws
--log-uri s3://aws-logs-132950491118-us-west-2/elasticmapreduce/j-1WZ39GFS3IX49/
--instance-type m3.2xlarge --instance-count 6 --use-default-roles

在 EMR 集群完成配置后,我将 Pyspark 应用程序复制到位于“/home/hadoop/pysparkApp.py”的主节点上。通过复制它,我可以为 spark-submit 添加步骤。

aws emr add-steps --cluster-id j-1DQJ8BDL1394N --steps
Type=spark,Name=SparkTPCHTests,Args=[--deploy-mode,cluster,-
conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,5,--executor
cores,5,--executor memory,20g,/home/hadoop/tpchSpark.py]
,ActionOnFailure=CONTINUE

现在,如果我仅对上述几个 CSV 文件运行此步骤,将生成最终结果,但脚本仍会声称已失败。

我认为它与对 S3NativeFileSystem 的额外调用有关,但我不确定。这些是我得到的 Yarn 日志消息,它们让我得出了这个结论。第一个电话似乎工作得很好:

16/05/15 23:18:00 INFO HadoopRDD: Input split: s3://data-set-builder/splitLineItem2/lineitemad:0+64901757
16/05/15 23:18:00 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[ED8011CE4E1F6F18], ServiceEndpoint=[https://data-set-builder.s3-us-west-2.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, ClientExecuteTime=[77.956], HttpRequestTime=[77.183], HttpClientReceiveResponseTime=[20.028], RequestSigningTime=[0.229], CredentialsRequestTime=[0.003], ResponseProcessingTime=[0.128], HttpClientSendRequestTime=[0.35],

虽然第二个似乎没有正确执行,导致“部分结果”(206 错误):

16/05/15 23:18:00 INFO S3NativeFileSystem: Opening 's3://data-set-builder/splitLineItem2/lineitemad' for reading
16/05/15 23:18:00 INFO latency: StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[10BDDE61AE13AFBE], ServiceEndpoint=[https://data-set-builder.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, Client Execute Time=[296.86], HttpRequestTime=[295.801], HttpClientReceiveResponseTime=[293.667], RequestSigningTime=[0.204], CredentialsRequestTime=[0.002], ResponseProcessingTime=[0.34], HttpClientSendRequestTime=[0.337],
16/05/15 23:18:02 INFO ApplicationMaster: Waiting for spark context initialization ...

我不明白为什么当第一个似乎有效响应甚至拆分文件时它甚至对 S3NativeFileSystem 进行第二次调用。这是我的 EMR 配置的产物吗?我知道 S3Native 有文件限制问题,直接 S3 调用是最佳的,这是我尝试做的,但无论我做什么,这个调用似乎都存在。请帮忙!

此外,在我的 Yarn 日志中添加一些其他错误消息,以防它们相关。

1)

16/05/15 23:19:22 ERROR ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application.
16/05/15 23:19:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Timed out waiting for SparkContext.)

2)

16/05/15 23:19:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:162)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:226)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/15 23:19:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401
16/05/15 23:19:22 WARN TaskMemoryManager: leak 32.3 MB memory from org.apache.spark.unsafe.map.BytesToBytesMap@762be8fe
16/05/15 23:19:22 ERROR Executor: Managed memory leak detected; size = 33816576 bytes, TID = 14
16/05/15 23:19:22 ERROR Executor: Exception in task 13.0 in stage 1.0 (TID 14)
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/3a/temp_shuffle_b9001fca-bba9-400d-9bc4-c23c002e0aa9 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

最佳答案

spark 配置的优先顺序是:

SparkContext(代码/应用程序)> Spark-submit > Spark-defaults.conf

这里有几点要指出 -

  1. 在您的 spark 提交命令中使用 YARN 集群作为部署模式和 master -

    spark-submit --deploy-mode cluster --master yarn ...

    spark-submit --master yarn-cluster ...

  2. 从代码中的 sc = SparkContext("local", "Simple App") 行中删除“local”字符串。使用 conf = SparkConf().setAppName(appName)
    sc = SparkContext(conf=conf)
    初始化 Spark 上下文。

引用 - http://spark.apache.org/docs/latest/programming-guide.html

关于hadoop - S3NativeFileSystem 调用是否会在 AWS EMR 4.6.0 上杀死我的 Pyspark 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37259403/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com