gpt4 book ai didi

python - 如何将数组(即列表)列转换为 Vector

转载 作者:IT老高 更新时间:2023-10-28 20:30:04 25 4
gpt4 key购买 nike

问题的简短版本!

考虑以下代码片段(假设 spark 已经设置为一些 SparkSession ):

from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)

请注意,温度字段是一个浮点数列表。我想将这些浮点数列表转换为 MLlib 类型 Vector ,我希望使用基本的 DataFrame 来表达这种转换API 而不是通过 RDD(这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是在 Python 中完成的,我们没有获得 Spark 的 Catalyst 优化器 yada yada 的好处)。我该怎么做呢?具体来说:
  • 有没有办法让直接 Actor 工作?有关详细信息(以及尝试解决方法失败),请参见下文?或者,是否还有任何其他操作具有我所追求的效果?
  • 我在下面建议的两种替代解决方案中哪个更有效(UDF 与爆炸/重新组装列表中的项目)?或者还有其他几乎但不完全正确的替代方案比它们中的任何一个都更好吗?

  • 直接 Actor 不起作用

    这就是我所期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换。作为上下文,让我提醒您将其转换为另一种类型的正常方法:
    from pyspark.sql import types
    df_with_strings = df.select(
    df["city"],
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
    )

    现在例如 df_with_strings.collect()[0]["temperatures"][1]'-7.0' .但是,如果我转换为 ml Vector,那么事情就不会那么顺利:
    from pyspark.ml.linalg import VectorUDT
    df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

    这给出了一个错误:
    pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
    'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
    +- LogicalRDD [city#0, temperatures#1]
    "

    哎呀!任何想法如何解决这一问题?

    可能的选择

    备选方案 1:使用 VectorAssembler
    有一个 Transformer这似乎是这项工作的理想选择: VectorAssembler .它需要一列或多列并将它们连接成一个向量。不幸的是它只需要 VectorFloat列,而不是 Array列,所以以下不起作用:
    from pyspark.ml.feature import VectorAssembler
    assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
    df_fail = assembler.transform(df)

    它给出了这个错误:
    pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

    我能想到的最好的解决方法是将列表分解成多列,然后使用 VectorAssembler再次收集它们:
    from pyspark.ml.feature import VectorAssembler
    TEMPERATURE_COUNT = 3
    assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
    outputCol="temperature_vector"
    )
    df_exploded = df.select(
    df["city"],
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
    )
    converted_df = assembler_exploded.transform(df_exploded)
    final_df = converted_df.select("city", "temperature_vector")

    这似乎是理想的,除了 TEMPERATURE_COUNT超过 100,有时超过 1000。(另一个问题是,如果你事先不知道数组的大小,代码会更复杂,虽然我的数据不是这种情况。)Spark 真的吗?生成一个包含那么多列的中间数据集,或者它是否只是认为这是单个项目暂时通过的中间步骤(或者当它看到这些列的唯一用途是组装成时,它确实完全优化了这个离开步骤向量)?

    备选方案 2:使用 UDF

    一个相当简单的替代方法是使用 UDF 进行转换。这让我可以在一行代码中非常直接地表达我想要做的事情,并且不需要制作包含大量列的数据集。但是所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(众所周知,Python 迭代单个数据项的速度很慢)。这是它的外观:
    from pyspark.ml.linalg import Vectors, VectorUDT
    from pyspark.sql.functions import udf
    list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
    df_with_vectors = df.select(
    df["city"],
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
    )

    可忽略的言论

    这个漫无边际的问题的其余部分是我在试图找到答案时想到的一些额外的东西。大多数阅读本文的人可能会跳过它们。

    不是解决方案:使用 Vector开始

    在这个简单的示例中,可以使用向量类型开始创建数据,但当然,我的数据实际上并不是我正在并行化的 Python 列表,而是从数据源中读取的。但为了记录,这里是这样的:
    from pyspark.ml.linalg import Vectors
    from pyspark.sql import Row
    source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
    ]
    df = spark.createDataFrame(source_data)

    低效解决方案:使用 map()
    一种可能性是使用 RDD map()将列表转换为 Vector 的方法.这类似于 UDF 的想法,除了它更糟糕,因为序列化等的成本是针对每一行中的所有字段产生的,而不仅仅是被操作的字段。作为记录,以下是该解决方案的样子:
    df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"],
    temperatures=Vectors.dense(row["temperatures"])
    )).toDF()

    尝试 Actor 的变通方法失败

    无奈之下,我注意到 Vector在内部由具有四个字段的结构表示,但使用来自该类型结构的传统强制转换也不起作用。这是一个插图(我使用 udf 构建了结构,但 udf 不是重要的部分):
    from pyspark.ml.linalg import Vectors, VectorUDT
    from pyspark.sql.functions import udf
    list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
    df_almost_vector = df.select(
    df["city"],
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
    )
    df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"],
    df_almost_vector["temperatures"].cast(VectorUDT())
    )

    这给出了错误:
    pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
    'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
    +- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
    +- LogicalRDD [city#0, temperatures#1]
    "

    最佳答案

    就我个人而言,我会使用 Python UDF 而不会打扰其他任何事情:

  • Vectors不是 native SQL 类型,因此会以一种或另一种方式存在性能开销。特别是这个过程需要两个步骤,其中数据是第一个 converted from external type to row ,然后 from row to internal representation using generic RowEncoder .
  • 任何下游 ML Pipeline将比简单的转换昂贵得多。此外,它需要一个与上述过程相反的过程

  • 但是,如果您真的想要其他选择,您可以:
  • 带有 Python 包装器的 Scala UDF:

    安装 sbt按照项目网站上的说明进行操作。

    创建具有以下结构的 Scala 包:

    .
    ├── build.sbt
    └── udfs.scala

    编辑 build.sbt (调整以反射(reflect) Scala 和 Spark 版本):

    scalaVersion := "2.11.8"

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-sql" % "2.4.4",
    "org.apache.spark" %% "spark-mllib" % "2.4.4"
    )

    编辑 udfs.scala :

    package com.example.spark.udfs

    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector

    object udfs {
    val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }

    包裹:

    sbt package

    并包括(或等效的,取决于 Scala 版本):

    $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

    作为 --driver-class-path 的参数启动 shell/提交应用程序时。

    在 PySpark 中定义一个包装器:
    from pyspark.sql.column import _to_java_column, _to_seq, Column
    from pyspark import SparkContext

    def as_vector(col):
    sc = SparkContext.getOrCreate()
    f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
    return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

    测试:
    with_vec = df.withColumn("vector", as_vector("temperatures"))
    with_vec.show()

    +--------+------------------+----------------+
    | city| temperatures| vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+

    with_vec.printSchema()

    root
    |-- city: string (nullable = true)
    |-- temperatures: array (nullable = true)
    | |-- element: double (containsNull = true)
    |-- vector: vector (nullable = true)
  • 将数据转储为反射(reflect) DenseVector 的 JSON 格式模式并将其读回:
    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT

    json_vec = to_json(struct(struct(
    lit(1).alias("type"), # type 1 is dense, type 0 is sparse
    col("temperatures").alias("values")
    ).alias("v")))

    schema = StructType([StructField("v", VectorUDT())])

    with_parsed_vector = df.withColumn(
    "parsed_vector", from_json(json_vec, schema).getItem("v")
    )

    with_parsed_vector.show()

    +--------+------------------+----------------+
    | city| temperatures| parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+

    with_parsed_vector.printSchema()

    root
    |-- city: string (nullable = true)
    |-- temperatures: array (nullable = true)
    | |-- element: double (containsNull = true)
    |-- parsed_vector: vector (nullable = true)
  • 关于python - 如何将数组(即列表)列转换为 Vector,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42138482/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com