gpt4 book ai didi

scala - Apache spark消息理解

转载 作者:行者123 更新时间:2023-12-04 17:53:53 24 4
gpt4 key购买 nike

请求帮助以理解此消息..

INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is **2202921** bytes

这里的 2202921 是什么意思?

我的工作做了一个洗牌操作,在从前一阶段读取洗牌文件时,它首先给出消息,然后在一段时间后它失败并出现以下错误:

14/11/12 11:09:46 WARN scheduler.TaskSetManager: Lost task 224.0 in stage 4.0 (TID 13938, ip-xx-xxx-xxx-xx.ec2.internal): FetchFailed(BlockManagerId(11, ip-xx-xxx-xxx-xx.ec2.internal, 48073, 0), shuffleId=2, mapId=7468, reduceId=224)
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Marking Stage 4 (coalesce at <console>:49) as failed due to a fetch failure from Stage 3 (map at <console>:42)
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Stage 4 (coalesce at <console>:49) failed in 213.446 s
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Resubmitting Stage 3 (map at <console>:42) and Stage 4 (coalesce at <console>:49) due to fetch failure
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Executor lost: 11 (epoch 2)
14/11/12 11:09:46 INFO storage.BlockManagerMasterActor: Trying to remove executor 11 from BlockManagerMaster.
14/11/12 11:09:46 INFO storage.BlockManagerMaster: Removed 11 successfully in removeExecutor
14/11/12 11:09:46 INFO scheduler.Stage: Stage 3 is now unavailable on executor 11 (11893/12836, false)
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Resubmitting failed stages
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Submitting Stage 3 (MappedRDD[13] at map at <console>:42), which has no missing parents
14/11/12 11:09:46 INFO storage.MemoryStore: ensureFreeSpace(25472) called with curMem=474762, maxMem=11113699737
14/11/12 11:09:46 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 24.9 KB, free 10.3 GB)
14/11/12 11:09:46 INFO storage.MemoryStore: ensureFreeSpace(5160) called with curMem=500234, maxMem=11113699737
14/11/12 11:09:46 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 5.0 KB, free 10.3 GB)
14/11/12 11:09:46 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on ip-xx.ec2.internal:35571 (size: 5.0 KB, free: 10.4 GB)
14/11/12 11:09:46 INFO storage.BlockManagerMaster: Updated info of block broadcast_6_piece0
14/11/12 11:09:46 INFO scheduler.DAGScheduler: Submitting 943 missing tasks from Stage 3 (MappedRDD[13] at map at <console>:42)
14/11/12 11:09:46 INFO cluster.YarnClientClusterScheduler: Adding task set 3.1 with 943 tasks

我的代码是这样的,

(rdd1 ++ rdd2).map { t => ((t.id), t) }.groupByKey(1280).map {
case ((id), sequence) =>
val newrecord = sequence.maxBy {
case Fact(id, key, type, day, group, c_key, s_key, plan_id,size,
is_mom, customer_shipment_id, customer_shipment_item_id, asin, company_key, product_line_key, dw_last_updated, measures) => dw_last_updated.toLong
}
((PARTITION_KEY + "=" + newrecord.day.toString + "/part"), (newrecord))
}.coalesce(2048,true).saveAsTextFile("s3://myfolder/PT/test20nodes/")```

我得出 1280,因为我有 20 个节点,每个节点有 32 个核心。我把它导出为 2*32*20。

最佳答案

对于一个 Shuffle 阶段,它会创建一些 ShuffleMapTask 将中间结果输出到磁盘。位置信息将存储在 MapStatuses 中并发送给 MapOutputTrackerMaster(驱动程序)。

然后当下一阶段开始运行时,它需要这些位置状态。所以执行者会要求 MapOutputTrackerMaster 来获取它们。 MapOutputTrackerMaster 会将这些状态序列化为字节,并将它们发送给执行程序。这是这些状态的大小(以字节为单位)。

这些状态将通过 Akka 发送。 Akka 对最大消息大小有限制。您可以通过 spark.akka.frameSize 设置它:

Maximum message size to allow in "control plane" communication (for serialized tasks and task results), in MB. Increase this if your tasks need to send back large results to the driver (e.g. using collect() on a large dataset).

如果大小大于 spark.akka.frameSize,Akka 将拒绝传递消息,您的作业将失败。因此,它可以帮助您将 spark.akka.frameSize 调整到最佳值。

关于scala - Apache spark消息理解,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26904619/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com