gpt4 book ai didi

python - 如何将来自 RDD.mapPartitions() 的 Pandas Dataframe 转换为 Spark DataFrame?

转载 作者:太空宇宙 更新时间:2023-11-04 00:22:49 25 4
gpt4 key购买 nike

我有一个返回 Pandas DataFrame 的 Python 函数。我使用 pyspark 的 RDD.mapPartitions() 在 Spark 2.2.0 中调用此函数.但我无法将 mapPartitions() 返回的 RDD 转换为 Spark DataFrame。 Pandas 生成此错误:

ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

说明问题的简单代码:

import pandas as pd

def func(data):
pdf = pd.DataFrame(list(data), columns=("A", "B", "C"))
pdf += 10 # Add 10 to every value. The real function is a lot more complex!
return [pdf]

pdf = pd.DataFrame([(1.87, 0.6, 7.1), (-0.3, 0.1, 8.2), (2.8, 0.3, 6.1), (-0.2, 0.5, 5.9)], columns=("A", "B", "C"))

sdf = spark.createDataFrame(pdf)
sdf.show()
rddIn = sdf.rdd

for i in rddIn.collect():
print(i)

result = rddIn.mapPartitions(func)

for i in result.collect():
print(i)

resDf = spark.createDataFrame(result) # --> ValueError!
resDf.show()

输出是:

+----+---+---+
| A| B| C|
+----+---+---+
|1.87|0.6|7.1|
|-0.3|0.1|8.2|
| 2.8|0.3|6.1|
|-0.2|0.5|5.9|
+----+---+---+
Row(A=1.87, B=0.6, C=7.1)
Row(A=-0.3, B=0.1, C=8.2)
Row(A=2.8, B=0.3, C=6.1)
Row(A=-0.2, B=0.5, C=5.9)
A B C
0 11.87 10.6 17.1
A B C
0 9.7 10.1 18.2
A B C
0 12.8 10.3 16.1
A B C
0 9.8 10.5 15.9

但是倒数第二行产生了上面提到的ValueError。我真的希望 resDf.show() 看起来与 sdf.show() 完全一样,只是表中的每个值都添加了 10。理想情况下,result RDD 应具有与 rddIn 相同的结构,RDD 进入 mapPartitions()

最佳答案

您必须将数据转换为标准 Python 类型并展平:

resDf = spark.createDataFrame(
result.flatMap(lambda df: (r.tolist() for r in df.to_records()))
)

resDF.show()
# +---+------------------+----+----+
# | _1| _2| _3| _4|
# +---+------------------+----+----+
# | 0|11.870000000000001|10.6|17.1|
# | 0| 9.7|10.1|18.2|
# | 0| 12.8|10.3|16.1|
# | 0| 9.8|10.5|15.9|
# +---+------------------+----+----+

如果你使用 Spark 2.3,这也应该有效

from pyspark.sql.functions import pandas_udf, spark_partition_id
from pyspark.sql.functions import PandasUDFType

@pandas_udf(sdf.schema, functionType=PandasUDFType.GROUPED_MAP)
def func(pdf):
pdf += 10
return pdf

sdf.groupBy(spark_partition_id().alias("_pid")).apply(func)

关于python - 如何将来自 RDD.mapPartitions() 的 Pandas Dataframe 转换为 Spark DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48541064/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com