- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试在 Amazon EMR 集群中提交一个简单的 Spark 作业。我的集群有 5 个 M4.2xlarge 实例(1 个主实例、4 个从实例),每个实例有 16 个 vCPU 和 32 GB 内存。
这是我的代码:
def main(args : Array[String]): Unit = {
val sparkConfig = new SparkConf()
.set("hive.exec.dynamic.partition", "true")
.set("hive.exec.dynamic.partition.mode", "nonstrict")
.set("hive.s3.max-client-retries", "50")
.set("hive.s3.max-error-retries", "50")
.set("hive.s3.max-connections", "100")
.set("hive.s3.connect-timeout", "5m")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock")
.set("spark.broadcast.compress", "true")
val spark = SparkSession.builder()
.appName("Spark Hive Example")
.enableHiveSupport()
.config(sparkConfig)
.getOrCreate()
// Set Kryo for serializing
GraphXUtils.registerKryoClasses(sparkConfig)
val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000")
val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String]))
val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000")
val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String]))
val graph = Graph(vertexRDD, edgesRDD)
val connectedComponents = graph.connectedComponents().vertices
table1 和 table2 都是 Hive 上 S3 支持的外部表。当我运行这个程序时,我的工作失败并出现以下错误:
Job aborted due to stage failure: Task 827 in stage 0.0 failed 4 times, most recent failure: Lost task 827.3 in stage 0.0 (TID 921, xxx.internal, executor 3): com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1069)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy35.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1194)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:773)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:355)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:237)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1204)
at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113)
at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:246)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:245)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:203)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.get(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1190)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
... 59 more
不确定它是来自 hadoop 还是从 hive 读取时,但我看到了类似的问题 here ,所以我在 spark-submit 命令中添加了以下参数:
--conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60"
还是不行。有谁知道这是怎么回事吗?
最佳答案
TLDR:您需要设置的属性是 emrfs-site.xml 配置文件中的 fs.s3.maxConnections。它默认为 50。我们得到的错误/堆栈跟踪与您完全相同,所以我将其设置为 5000,这解决了问题并且没有不良影响。
据我所知,根本原因是 InputFormat 实现没有正确使用 try...finally 来确保在抛出异常时关闭连接。值得注意的是,旧版本的 Hive,包括编译 Spark 的 v1.2.1,都存在这个错误。 Hive 2.x 大量重构了 OrcInputFormat,但我还没有验证错误是否已修复,我也不知道是否/何时/如何针对 Hive 2.x 编译 Spark。
解决方法增加了连接池的大小,如另一个答案中所建议的,但属性及其位置都与“经典”S3 文件系统 (s3/s3a/s3n) 中的完全不同。当然,这在任何地方都没有记录,并且需要反编译 emrfs jar 来梳理......
关于hadoop - 在 EMR 上运行 Spark 作业时 AWS 连接超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45971572/
当我确定源数据在 S3 中并且处理的结果将存储在 S3 中时,是否可以使用主节点和一组任务(从属)节点(没有核心节点)构建 AWS EMR。 基本上,问题是“当 EMR 将在 S3 中处理数据时,需要
我正在使用 aws .net sdk 向 EMR 运行 s3distcp 作业,以使用 --groupBy arg 连接文件夹中的所有文件。但是无论我尝试过什么“groupBy”arg,它总是失败,或
我刚刚建立了一个内置 Spark、JupyterHub 等的 EMR 集群。我可以通过 http://master_hostname:9443/hub/login 访问 Jupyter Noteboo
我正在 S3 上运行一个超过 500 个文档的示例 hadoop 作业,在本地运行时需要 <15 分钟才能完成。然而,当我尝试在 EMR 上运行相同的作业时,需要两个多小时,但仍然没有完成缩减步骤,因
是否可以将 Presto 解释器添加到 AWS EMR 4.3 上的 Zeppelin,如果可以,有人可以发布说明吗?我在 EMR 上运行 Presto-Sandbox 和 Zeppelin-Sand
AWS Stepfunctions 最近添加了 EMR 集成,这很酷,但我找不到将变量从步骤函数传递到 addstep 参数的方法。例如,我想将“$.dayid”变量传递给“Parameters”>“
例如,我有两个Hive作业,其中一个作业的输出用作第二个作业的参数/变量。我可以在终端上成功运行以下命令,以在EMR集群的主节点上获得结果。 [hadoop@ip-10-6-131-223 ~]$ h
我有一个非常初学者的问题。我刚刚阅读了一些有关 Amazon EMR 的文档。在我注册之前,我只是想询问一下如何在其中使用 R。 我有一个 R 模块,它调用其他几个模块,然后,在它完成运行之前,将几个
我在从运行 Spark 的 AWS EMR 集群连接到另一个运行 presto 的 AWS EMR 集群时遇到问题。 用 python 编写的代码是: jdbcDF = spark.read \
我正在努力解决这个问题,但无法弄清楚为什么 我有一个要部署在 AWS 私有(private)子网中的 EMR 集群。 我检查了文档 here . 根据以上内容,我明白了以下几点: 一个。对于我的 EM
我有一个 EMR 集群 response = emr_client.run_job_flow( Name="Test dashboards", ReleaseLabel='emr-6.
我在使用 hadoop 时使用了 MultipleInputs 。因为我有多个映射器分配给不同的输入。我想知道 EMR 是否也支持它。 在hadoop中我是这样操作的。这些是我的不同文件的映射器。在这
我是 PySpark 和 EMR 的新手。 我试图通过 Jupyter notebook 访问在 EMR 集群上运行的 Spark,但遇到了错误。 我正在使用以下代码生成 SparkSession:
我正在尝试将我的 Glue 目录连接到 EMR 中的 Presto 和 Hive。在 presto-cli 中运行查询时,我收到 NullPointerException 而相同的查询在 hive-c
我正在使用 MRJob 在 Amazon 的 EMR 上运行一个迭代的 hadoop 程序。 当我不使用“--pool-emr-job-flows”选项时,一切正常(但速度很慢)。当我使用这个选项时,
我有一个 DynamoDB 表,我需要连接到 EMR Spark SQL 以在该表上运行查询。我得到了带有发布标签 emr-4.6.0 和 Spark 1.6.1 的 EMR Spark Cluste
我的团队在 AWS 中工作,我们有 python 脚本,可以将文件从 S3 存储桶移动到 EC2 实例。我想用我们正在使用的脚本作为序言,它在直接从 ec2 实例运行时有效,并且仅在作为 EMR 步骤
我有 Airflow 作业,它们在 EMR 集群上运行良好。我需要的是,假设我有 4 个 Airflow 作业需要 EMR 集群,假设 20 分钟才能完成任务。为什么我们不能在 DAG 运行时创建一个
我正在 AWS 中创建一个数据管道来运行 Pig 任务。但是我的 Pig 任务需要 EMR 中的附加文件。在创建集群之后和运行 pig tasked 之前,我如何告诉 Data Pipeline 将文
如何在 EMR 上设置 Spark Thrift 服务器?我正在尝试使用 Spark Thrift 服务器与 EMR 建立 JDBC/ODBC 连接。例如 直线> !connect jdbc:hive
我是一名优秀的程序员,十分优秀!