gpt4 book ai didi

python - 我应该在代码中添加什么以避免使用 pyspark 出现 'exceeds max allowed bytes' 错误?

转载 作者:行者123 更新时间:2023-12-05 07:25:27 25 4
gpt4 key购买 nike

我有一个包含 400 万行和 10 列的数据框。我正在尝试使用 pyspark 从 Cloudera Data Science Workbench 将其写入 hdfs 中的表。尝试执行此操作时遇到错误:

[Stage 0:>                                                          (0 + 1) / 
2]19/02/20 12:31:04 ERROR datasources.FileFormatWriter: Aborting job null.
org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 0:0 was 318690577 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

我可以将数据帧分解为 3 个数据帧,并分别执行 3 次 Spark 写入,但如果可能的话,我想只执行一次,方法是向 Spark 代码添加一些内容,例如 coalesce

import pandas as pd
df=pd.read_csv('BulkWhois/2019-02-20_Arin_Bulk/Networks_arin_db_2-20-2019_parsed.csv')

'''PYSPARK'''
from pyspark.sql import SQLContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark import SparkContext
spark = SparkSession.builder.appName('Arin_Network').getOrCreate()

schema = StructType([StructField('NetHandle', StringType(), False),
StructField('OrgID', StringType(), True),
StructField('Parent', StringType(), True),
StructField('NetName', StringType(), True),
StructField('NetRange', StringType(), True),
StructField('NetType', StringType(), True),
StructField('Comment', StringType(), True),
StructField('RegDate', StringType(), True),
StructField('Updated', StringType(), True),
StructField('Source', StringType(), True)])

dataframe = spark.createDataFrame(df, schema)
dataframe.write. \
mode("append"). \
option("path", "/user/hive/warehouse/bulkwhois_analytics.db/arin_network"). \
saveAsTable("bulkwhois_analytics.arin_network")

最佳答案

User10465355 提到我应该直接使用 Spark。这样做更简单,也是实现此目的的正确方法。

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Networks').getOrCreate()

dataset = spark.read.csv('Networks_arin_db_2-20-2019_parsed.csv', header=True, inferSchema=True)
dataset.show(5)

dataset.write \
.mode("append") \
.option("path", "/user/hive/warehouse/analytics.db/arin_network") \
.saveAsTable("analytics.arin_network")

关于python - 我应该在代码中添加什么以避免使用 pyspark 出现 'exceeds max allowed bytes' 错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54793794/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com