gpt4 book ai didi

apache-spark - python - 如何将密集向量的RDD转换为pyspark中的DataFrame?

转载 作者:行者123 更新时间:2023-12-04 03:36:22 28 4
gpt4 key购买 nike

我有一个 DenseVector RDD像这样

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

我想把它转换成 Dataframe .我试过这样

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

它给出了这样的错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
struct = self._inferSchema(rdd, samplingRatio)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
schema = _infer_schema(first)
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

旧解决方案
frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

编辑 1 - 代码可重现
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

最佳答案

您无法转换 RDD[Vector]直接地。它应该映射到 RDD可以解释为 structs 的对象数,例如 RDD[Tuple[Vector]] :

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

否则 Spark 将尝试转换对象 __dict__并创建使用不受支持的 NumPy 数组作为字段。

from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)

TypeError                                 Traceback (most recent call last)
...
TypeError: not supported type: <class 'numpy.ndarray'>

对比

_infer_schema((v, ))

StructType(List(StructField(_1,VectorUDT,true)))

备注 :
  • 在 Spark 2.0 中,您必须使用正确的本地类型:
  • pyspark.ml.linalg工作时DataFrame基于 pyspark.ml API。
  • pyspark.mllib.linalg工作时RDD基于 pyspark.mllib API。

  • 这两个命名空间不再兼容并需要显式转换(例如 How to convert from org.apache.spark.mllib.linalg.VectorUDT to ml.linalg.VectorUDT )。
  • 编辑中提供的代码与原始问题中的代码不同。您应该知道 tuplelist没有相同的语义。如果您将向量映射到对,请使用 tuple并直接转换为 DataFrame :

    tfidf.rdd.map(
    lambda row: (row[0], DenseVector(row[1].toArray()))
    ).toDF()

    使用 tuple (产品类型)也适用于嵌套结构,但我怀疑这是你想要的:

    (tfidf.rdd
    .map(lambda row: (row[0], DenseVector(row[1].toArray())))
    .map(lambda x: (x, ))
    .toDF())
    list在顶层以外的任何其他地方row被解释为 ArrayType .
  • 使用 UDF 进行转换 ( Spark Python: Standard scaler error "Do not support ... SparseVector" ) 更简洁。
  • 关于apache-spark - python - 如何将密集向量的RDD转换为pyspark中的DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41328799/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com