gpt4 book ai didi

python - Spark : Warning that task size is too large despite no large, 非分布式文件

转载 作者:太空宇宙 更新时间:2023-11-03 16:53:55 26 4
gpt4 key购买 nike

这是我的代码的想法:

我有一个很大的电子邮件数据 RDD,称为 email。大约 7 亿封电子邮件。它看起来像这样:

[['value1','value2','value3','value4'],['recipient1','recipient2','recipient3'],['sender']]

email 中有超过 40,000 个不同的收件人和发件人电子邮件地址。我有一个包含 600 个我感兴趣的电子邮件地址的列表,如下所示:

relevant_emails = ['rel_email1','rel_email2','rel_email3',...,'rel_email600']

我想迭代我的大型 RDD email 以仅保留发件人和收件人都位于 relevant_emails 列表中的那些电子邮件。因此,我广播了 related_emails,以便每个工作节点都有一个副本:broadcast_emails = sc.broadcast(relevant_emails)

这是我想要应用于email中每一行的函数:

def get_relevant_emails(row):
r_bool = False
s_bool = False
recipients = row[1]
sender = row[2]
if sender[0] in broadcast_emails.value:
s_bool = True
for x in range(0, len(recipients)):
if recipients[x] in broadcast_emails.value:
r_bool = True
break
if (r_bool is True and s_bool is True):
return row

我面临的问题是,当我运行 emails.map(lambda row: get_relevant_emails(row)) (然后用强制其执行的内容进行跟踪,例如 saveAsTextFile()),它开始运行,然后发送:

警告:阶段 5 包含一个非常大的任务 (xxxx KB)。建议的最大任务大小为 100 KB

然后它停止运行。仅供引用:我在 Spark shell 中运行此程序,有 20 个执行器、每个执行器 10GB 内存、每个执行器 3 个核心。就 HDFS 上的 block 存储消耗而言,电子邮件 的大小为 76.7 GB,我将其分为 600 个分区(76.7 GB/128 MB)。

最佳答案

警告所指的任务大小可能是由于 get_relevant_emails() 函数中分配的变量数量所致。任务大小可能超过最大建议大小的另一种方法是引用函数范围之外的其他变量。

无论如何,我建议使用 DataFrame API,因为它使此操作更简单并且性能更好。它速度更快,因为它可以完成 Java 中的所有繁重工作,并且避免从 Python 和 Java Vms 来回编码数据。我和我的团队将大部分现有的 Python 逻辑转移到 SparkSQL 和 DataFrames 中,并看到了巨大的性能改进。

以下是它如何适用于您的案例:

from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import broadcast, expr

sc = SparkContext()
sql_ctx = SQLContext(sc)

email = [
[['value1','value2','value3','value4'],['recipient1','recipient2','recipient3'],['sender1']],
[['value1','value2','value3','value4'],['recipient1','recipient2','recipient3'],['sender2']],
[['value1','value2','value3','value4'],['recipient1','recipient4','recipient5'],['sender3']]
]

relevant_addresses = [
["sender2"],
["sender3"],
["recipient3"]
]

email_df = sql_ctx.createDataFrame(email, ["values", "recipients", "sender"])
relevant_df = sql_ctx.createDataFrame(relevant_addresses, ["address"])
broadcasted_relevant = broadcast(relevant_df)

result = email_df.join(
broadcasted_relevant,
on=expr("array_contains(recipients, address) OR array_contains(sender, address)"),
how="leftsemi"
)

result.collect()

这里的左半连接就像一个过滤器,只从 email_df 中选择匹配的行。这与在 SQL 中使用“WHERE IN”子句时发生的连接类型相同。

关于python - Spark : Warning that task size is too large despite no large, 非分布式文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35607579/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com