gpt4 book ai didi

java - s3开始使用Apache Spark返回内容长度错误的结尾,同时在较旧的EC2实例上可以正常工作

转载 作者:行者123 更新时间:2023-12-02 20:14:50 28 4
gpt4 key购买 nike

我们将S3存储桶用作数据集的数据存储,其中数据存储为 Parquet 文件。
我们正在以Hydrosphere Mist作为代理的AWS r6g实例上运行Apache Spark,以在Spark实例中启 Action 业。我们使用本地集群模式,因此每个实例都在Docker容器中运行spark worker。 4天前,当从存储桶中读取 Parquet 文件时,我们的一个EC2实例突然开始出现内容长度错误的结尾,而另一个实例读取它们就好了。但是,第二天,两个正在运行的实例都开始出现该错误。重新创建它们也无济于事,该问题并没有消失,并且发生在每次从S3存储桶读取数据的作业运行中。
错误如下:

Error: RSocket error 0x201 (APPLICATION_ERROR): org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 1977131; received: 849
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:72)
at org.apache.hadoop.fs.s3a.S3AInputStream.seek(S3AInputStream.java:115)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:62)
at org.apache.parquet.hadoop.util.H1SeekableInputStream.seek(H1SeekableInputStream.java:46)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1157)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:107)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$12.apply(RDD.scala:823)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我们使用的版本:
  • spark-2.4.4
  • 水圈雾-1.1.3
  • hadoop-2.7
  • java aws sdk-1.7.4(hadoop 2.7使用相同版本)
  • jvm-1.8

  • 我们使用 s3a://读取S3存储桶,并使用 org.apache.hadoop.fs.s3a.S3AFileSystem s3a实现。
    我认为可能的问题之一可能是Java AWS SDK版本,因为1.7.4是一个非常老的版本,目前已受支持,但是Spark将2.7作为其主要Hadoop版本,因此这不应该成为问题。
    数据集本身很小,目前都在10mb以下。
    有没有人遇到这个问题?
    edit1:失败的内容长度通常是相同的,它们都是随机的。因此,它始终为849、7744、664或8192。这取决于特定的工作。从 Parquet 文件中删除压缩也将最小数字从696更改为849。

    最佳答案

    编辑:不固定。问题仍然存在。但是,事实证明,在本地运行 Spark 和雾不会产生此问题。
    较新的Hadoop 2.7构建实际上不使用aws-java-sdk 1.7.4。更新到最新版本的Hadoop随附的最新Spark版本解决了该问题。我们现在使用Spark 3.0.1和Hadoop 2.7.4。
    另一个问题是我们同时包含了org.apache.hadoop:hadoop-aws和aws-java-sdk依赖项。第二个可以安全地删除。之后,一切又恢复了正常

    关于java - s3开始使用Apache Spark返回内容长度错误的结尾,同时在较旧的EC2实例上可以正常工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64645815/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com