gpt4 book ai didi

python - 为什么当我将 wholeTextFiles() 与 pyspark 一起使用时 AWS 拒绝我的连接?

转载 作者:太空宇宙 更新时间:2023-11-03 23:59:21 27 4
gpt4 key购买 nike

我用

sc.wholeTextFiles(",".join(fs), minPartitions=200)

在具有 96cpus 的单个数据处理节点上从 S3 下载 6k XML 文件(每个文件 50MB)。当我有 minPartitions=200 时,AWS 拒绝我的连接,但是当我使用 minPartitions=50 时,一切正常。为什么?

Spark 的一些日志:

(...)
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:30 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:30 ERROR org.apache.spark.api.python.PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
eval_type = read_int(infile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 717, in read_int
raise EOFError
EOFError

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InterruptedIOException: getFileStatus on s3a://uni-swim-firehose/tfms/2019/04/03/10/SWIM-TFMS-2-2019-04-03-10-51-52-0fd9f05a-cbc5-4c1c-aef2-aa275ee3c404.gz: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool```

最佳答案

com.amazonaws.SdkClientException:无法执行 HTTP 请求:WAITING来自池的连接超时

wholeTextfiles 根据您拥有的分区数量,每个文件都有与 s3 的单独客户端连接。并且默认为 50

因此您在 50 个分区中没有打嗝。

如果您尝试增加到 200,则会出现上述异常。

解决方案:

参见亚马逊文档:How do I resolve the error "Timeout waiting for connection from pool" in Amazon EMR?

emrfs-site.xml 配置文件中的

fs.s3.maxConnections。默认为 50。

由于您将 s3a 与 spark 结合使用,您可以尝试低于最大连接数 200,如示例中所示。


python 方式:

def create_spark_session(aws_access_key, aws_secret_key, app_name):
try:

spark = SparkSession.builder. \
config("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"). \
config("fs.s3a.awsAccessKeyId", aws_access_key). \
config("fs.s3a.awsSecretAccessKey", aws_secret_key). \
config("fs.s3a.fast.upload", "true"). \
config("fs.s3a.multipart.size", "1G"). \
config("fs.s3a.fast.upload.buffer", "disk"). \
config("fs.s3a.connection.maximum", 200. \
config("fs.s3a.attempts.maximum", 20). \
config("fs.s3a.connection.timeout", 30). \
config("fs.s3a.threads.max", 10). \
config("fs.s3a.buffer.dir", "hdfs:///user/hadoop/temporary/s3a"). \
appName(app_name). \
getOrCreate()

return spark
except Exception as e:
logging.error(e)
sys.exit(-1)

对于 Scala 用户:

/**
* example getSparkSessionForS3
* @return
*/
def getSparkSessionForS3():SparkSession = {
val conf = new SparkConf()
.setAppName("testS3File")
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.hadoop.fs.s3a.endpoint", "yourendpoint")
.set("spark.hadoop.fs.s3a.connection.maximum", "200")
.set("spark.hadoop.fs.s3a.fast.upload", "true")
.set("spark.hadoop.fs.s3a.connection.establish.timeout", "500")
.set("spark.hadoop.fs.s3a.connection.timeout", "5000")
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.set("spark.hadoop.com.amazonaws.services.s3.enableV4", "true")
.set("spark.hadoop.com.amazonaws.services.s3.enforceV4", "true")

val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
spark
}

进一步阅读:

  1. amazon-s3-best-practice-and-tuning-for-hadoopspark-in-the-cloud ----slide number 38
  2. https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/#aTimeout_waiting_for_connection_from_pool_when_writing_to_S3A

In #2 all these exceptions were discussed

关于python - 为什么当我将 wholeTextFiles() 与 pyspark 一起使用时 AWS 拒绝我的连接?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56259853/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com