gpt4 book ai didi

apache-spark - Spark 。约 1 亿行。大小超过 Integer.MAX_VALUE?

转载 作者:行者123 更新时间:2023-12-04 05:11:13 24 4
gpt4 key购买 nike

(这是在小型三机 Amazon EMR 集群上运行的 Spark 2.0)

我有一个 PySpark 作业将一些大文本文件加载到 Spark RDD 中,执行 count() 后成功返回 158,598,155。

然后该作业将每一行解析为一个 pyspark.sql.Row 实例,构建一个 DataFrame,并进行另一次计数。 DataFrame 上的第二个 count() 导致 Spark 内部代码异常 Size exceeds Integer.MAX_VALUE。这适用于较小的数据量。有人可以解释为什么/如何发生这种情况吗?

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 1.0 failed 4 times, most recent failure: Lost task 22.3 in stage 1.0 (TID 77, ip-172-31-97-24.us-west-2.compute.internal): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:604)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:661)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

PySpark 代码:

raw_rdd = spark_context.textFile(full_source_path)

# DEBUG: This call to count() is expensive
# This count succeeds and returns 158,598,155
logger.info("raw_rdd count = %d", raw_rdd.count())
logger.info("completed getting raw_rdd count!!!!!!!")

row_rdd = raw_rdd.map(row_parse_function).filter(bool)
data_frame = spark_sql_context.createDataFrame(row_rdd, MySchemaStructType)

data_frame.cache()
# This will trigger the Spark internal error
logger.info("row count = %d", data_frame.count())

最佳答案

错误不是来自 data_frame.count() 本身,而是因为通过 row_parse_function 解析行会产生一些不符合指定整数类型的整数在 MySchemaStructType 中。

尝试将架构中的整数类型增加到 pyspark.sql.types.LongType() 或者让 spark 通过省略架构来推断类型(但这会减慢评估速度)。

关于apache-spark - Spark 。约 1 亿行。大小超过 Integer.MAX_VALUE?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38961545/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com