gpt4 book ai didi

hadoop - Spark SQL无法完成大量分片的Parquet数据写入

转载 作者:可可西里 更新时间:2023-11-01 14:13:01 28 4
gpt4 key购买 nike

我正在尝试使用 Apache Spark SQL 将 S3 中的 json 日志数据 etl 到也在 S3 上的 Parquet 文件中。我的代码基本上是:

import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")

此代码在我有多达 2000 个分区时有效,而在 5000 或更多分区时失败,无论数据量如何。通常可以将分区合并到一个可接受的数量,但这是一个非常大的数据集,在 2000 个分区时我遇到了这个 question 中描述的问题

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
at $line37.$read$$iwC$$iwC.<init>(<console>:58)
at $line37.$read$$iwC.<init>(<console>:60)
at $line37.$read.<init>(<console>:62)
at $line37.$read$.<init>(<console>:66)
at $line37.$read$.<clinit>(<console>)
at $line37.$eval$.<init>(<console>:7)
at $line37.$eval$.<clinit>(<console>)
at $line37.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

我在 ec2 中的 R3.xlarge 上的 spark-1.1.0 上运行它。我正在使用 spark-shell 控制台来运行上面的代码。之后我能够对 data SchemaRDD 对象执行重要的查询,因此它似乎不是资源问题。也可以读取和查询生成的 Parquet 文件,只是由于缺少摘要文件,因此需要非常长的时间。

最佳答案

尝试将此属性设置为 false :

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");

关于hadoop - Spark SQL无法完成大量分片的Parquet数据写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26291165/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com