gpt4 book ai didi

apache-spark - 无法使用 rdd.toDF() 但spark.createDataFrame(rdd) 有效

转载 作者:行者123 更新时间:2023-12-01 16:31:37 24 4
gpt4 key购买 nike

我有一个形式为 RDD[(string, List(Tuple))] 的 RDD ,如下所示:

[(u'C1589::HG02922', [(83779208, 2), (677873089, 0), ...]

当尝试运行以下代码将其转换为数据帧时,spark.createDataFrame(rdd)工作正常但是 rdd.toDF()失败。

vector_df1 = spark.createDataFrame(vector_rdd) # Works fine.
vector_df1.show()
+--------------+--------------------+
| _1| _2|
+--------------+--------------------+
|C1589::HG02922|[[83779208,2], [6...|
| HG00367|[[83779208,0], [6...|
| C477::HG00731|[[83779208,0], [6...|
| HG00626|[[83779208,0], [6...|
| HG00622|[[83779208,0], [6...|
...
vector_df2 = vector_rdd.toDF() # Tosses the error.

抛出的错误是:

Traceback (most recent call last):
File "/tmp/7ff0f62d-d849-4884-960f-bb89b5f3dd80/ml_on_vds.py", line 47, in <module>
vector_df2 = vector_rdd.toDF().show()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 57, in toDF
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1124, in __call__
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1094, in _build_args
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 289, in get_command_part
AttributeError: 'PipelinedRDD' object has no attribute '_get_object_id'
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [7ff0f62d-d849-4884-960f-bb89b5f3dd80] entered state [ERROR] while waiting for [DONE].

有人遇到过类似的问题吗? .toDF()只是 createDataFrame() 的简单包装所以我不明白为什么它会失败。我已在运行时验证我使用的是 Spark 2.0.2。

# Imports    
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, hash
from pyspark.sql.types import *
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from hail import *

# SparkSession
spark = (SparkSession.builder.appName("PopulationGenomics")
.config("spark.sql.files.openCostInBytes", "1099511627776")
.config("spark.sql.files.maxPartitionBytes", "1099511627776")
.config("spark.hadoop.io.compression.codecs", "org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,org.apache.hadoop.io.compress.GzipCodec")
.getOrCreate())

根据请求,生成错误的更多代码:

vector_rdd = (indexed_df.rdd.map(lambda r: (r[0], (r[3], r[2])))
.groupByKey()
.mapValues(lambda l: Vectors.sparse((max_index + 1), list(l))))
vector_df = spark.createDataFrame(vector_rdd, ['s', 'features']) # Works
vector_df1 = vector_rdd.toDF()
vector_df1.show() # Fails

indexed_df是架构的 DataFrame:

StructType(List(StructField(s,StringType,true),StructField(variant_hash,IntegerType,false),StructField(call,IntegerType,true),StructField(index,DoubleType,true)))

看起来像......

+--------------+------------+----+-----+
| s|variant_hash|call|index|
+--------------+------------+----+-----+
|C1046::HG02024| -60010252| 0|225.0|
|C1046::HG02025| -60010252| 1|225.0|
|C1046::HG02026| -60010252| 0|225.0|
|C1047::HG00731| -60010252| 0|225.0|
|C1047::HG00732| -60010252| 1|225.0|
|C1047::HG00733| -60010252| 0|225.0|
|C1048::HG02024| -60010252| 0|225.0|
|C1048::HG02025| -60010252| 1|225.0|
|C1048::HG02026| -60010252| 0|225.0|
|C1049::HG00731| -60010252| 0|225.0|
|C1049::HG00732| -60010252| 1|225.0|
|C1049::HG00733| -60010252| 0|225.0|
|C1050::HG03006| -60010252| 0|225.0|
|C1051::HG03642| -60010252| 0|225.0|
|C1589::HG02922| -60010252| 2|225.0|
|C1589::HG03006| -60010252| 0|225.0|
|C1589::HG03052| -60010252| 2|225.0|
|C1589::HG03642| -60010252| 0|225.0|
|C1589::NA12878| -60010252| 1|225.0|
|C1589::NA19017| -60010252| 1|225.0|
+--------------+------------+----+-----+

最佳答案

toDF 方法在 1.x 版本中在 SparkSessionSQLContex 下执行。所以

spark = SparkSession(sc)
hasattr(rdd, "toDF")

如果您使用的是 scala,则需要导入 import Spark.implicits._,其中 Spark 是您创建的 SparkSession 对象。

希望这有帮助!

关于apache-spark - 无法使用 rdd.toDF() 但spark.createDataFrame(rdd) 有效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43810603/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com