- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试使用 Apache Spark SQL 将 S3 中的 json 日志数据 etl 到也在 S3 上的 Parquet 文件中。我的代码基本上是:
import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")
此代码在我有多达 2000 个分区时有效,而在 5000 或更多分区时失败,无论数据量如何。通常可以将分区合并到一个可接受的数量,但这是一个非常大的数据集,在 2000 个分区时我遇到了这个 question 中描述的问题
14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
at $line37.$read$$iwC$$iwC.<init>(<console>:58)
at $line37.$read$$iwC.<init>(<console>:60)
at $line37.$read.<init>(<console>:62)
at $line37.$read$.<init>(<console>:66)
at $line37.$read$.<clinit>(<console>)
at $line37.$eval$.<init>(<console>:7)
at $line37.$eval$.<clinit>(<console>)
at $line37.$eval.$print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
at java.io.FilterInputStream.close(FilterInputStream.java:181)
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
我在 ec2 中的 R3.xlarge 上的 spark-1.1.0 上运行它。我正在使用 spark-shell 控制台来运行上面的代码。之后我能够对 data
SchemaRDD 对象执行重要的查询,因此它似乎不是资源问题。也可以读取和查询生成的 Parquet 文件,只是由于缺少摘要文件,因此需要非常长的时间。
最佳答案
尝试将此属性设置为 false :
sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");
关于hadoop - Spark SQL无法完成大量分片的Parquet数据写入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26291165/
是否可以对 parquet 格式执行分布式并发写入? 是否可以在写入拼花文件时读取它们? 如果有并发读/写的方法,我有兴趣了解。 提前感谢您的帮助。 最佳答案 我最终得到了 Parquet 开发人员的
如何从命令行检查 Parquet 文件的内容? 我现在看到的唯一选择是 $ hadoop fs -get my-path local-file $ parquet-tools head local-f
我正在使用基于 Java(1.8) 的应用程序使用库创建 Parquet 文件 org.apache.avro.Schema 和 org.apache.parquet.hadoop.ParquetWr
我已经使用 pyspark 创建了多个 parquet 文件,现在我正在尝试将所有 parquet 文件合并为 1 个。我能够合并这些文件,但是在读取生成的文件时,我遇到了错误。以前有人遇到过这个问题
我创建了一个数据框,如下所示: expanded_1 = pd.DataFrame({"Point": [random.choice(points) for x in range(30000000)]
当我在 R 和 Python 中保存 Parquet 文件(使用 pyarrow)时,我得到一个保存在元数据中的箭头模式字符串。 如何读取元数据?它是 Flatbuffer 编码数据吗?架构的定义在哪
例如,pandas 的 read_csv有一个 chunk_size允许 read_csv 的参数在 CSV 文件上返回一个迭代器,以便我们可以分块读取它。 Parquet 格式以块的形式存储数据,但
我正在尝试运行最新版本的 Parquet 工具,但遇到了一些问题。出于某种原因org.apache.hadoop.conf.Configuration不在阴影的 jar 里。 (我对 v1.6.0 也
我正在使用 Parquet 框架来编写 Parquet 文件。 我使用此构造函数创建了 Parquet 作家- public class ParquetBaseWriter extends Parqu
使用 spark 和钻头,我可以查询本地 Parquet 文件。 presto 是否提供相同的功能? 换句话说,是否可以使用 presto 查询本地 Parquet 文件 - 无需通过 HDFS 或
我有一个加密的 parquet 数据文件,它被读取为一个输入流。我想从此输入流中提取单个 Parquet 记录。有什么办法可以做到这一点吗?在 avro 中,使用 DatumReader 是可能的。我
我知道 Apache Arrow Parquet 可以读取符合规范的 Delta 编码文件,但不能将它们写出。我想知道是否有任何常用的开源 C++/Python 库可以写出符合 Parquet 规范的
背景: DuckDB 允许直接查询 parquet 文件。例如con.execute("从'Hierarchy.parquet'中选择 *) Parquet 允许按列值对文件进行分区。当一个 Parq
有没有办法将一个巨大的 parquet 文件分成较小的文件(使用 Python)?保留所有列并划分行?谢谢 最佳答案 你可以用 dask 来做. import dask.dataframe as dd
我的 Parquet 文件为 800K 行 x 8.7K 列。我将其加载到 dask 数据框中: import dask.dataframe as dd dask_train_df = dd.read
我有数百个用 PyArrow 创建的 Parquet 文件。然而,其中一些文件的字段/列的名称(我们称其为 Orange)与原始列(称其为 Sporange)略有不同,因为其中一个使用了查询的变体。否
我正在尝试在配置单元中创建 Parquet 表。我可以创建它,但是当我运行 analyze table mytable compute statistics 时;我得到这个结果: numfiles=8
我知道 hdfs 会将文件拆分成大约 64mb 的 block 。我们有流式传输的数据,我们可以将它们存储到大文件或中等大小的文件中。列式文件存储的最佳大小是多少?如果我可以将文件存储到最小列为 64
我想使用 Apache 的 parquet-mr 项目通过 Java 以编程方式读取/写入 Parquet 文件。我似乎找不到任何有关如何使用此 API 的文档(除了查看源代码并查看它的使用方式)——
我在 Impala 中移动数据,而不是我的设计,我丢失了一些数据。我需要将数据从 Parquet 表复制回它们原来的非 Parquet 表。最初,开发人员使用脚本中的一个简单的一行来完成此操作。由于我
我是一名优秀的程序员,十分优秀!