gpt4 book ai didi

python-3.x - 如何在 Spark 数据帧中存储 Python 字节串

转载 作者:行者123 更新时间:2023-12-03 14:36:43 25 4
gpt4 key购买 nike

我在看一个谜。我在 RDD 中有一堆长文档可用作 Python 字节串( b"I'm a byte string" )。现在我将此 RDD 转换为 DataFrame加入另一个 DataFrame .我这样做:

Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)

Data_schema = StructType([
StructField("URI", StringType(), True),
StructField("Content", StringType(), True),
])

Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)

print(Data_DF.show(5))

+--------------------+-----------+
| URI| Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows

这些短 "[B@10628e42"字符串对我来说似乎没什么用,可能是某种指针。字节串在 RDD 中仍然是“完整的”,因为我仍然可以访问它们。所以在从 RDD 到 DataFrame 的转换中出现问题。现在我尝试将字节串存储在其他类型的字段中,即 ByteType()BinaryType() .两者都不起作用,因为这些错误消息不接受字节串​​:
TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>

但它变得更加奇怪。当我设置一个小规模实验时:
ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))

DF2_schema = StructType([
StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)

print(DF_ByteStrings.show())

在 StringType 列中也不允许使用小字节串。当我尝试运行它时,我收到此错误消息:
StructType can not accept object b'one' in type <class 'bytes'>

当我尝试让 spark 推断类型时,它也会失败并显示以下消息:
TypeError: Can not infer schema for type: <class 'bytes'>

所以任何想法如何在 DataFrame 中存储字节串没有到 .decode()他们。这是我加入两个之后才能做的事情 DataFrames在一起,因为另一个保存解码信息。

我使用 Python 3.5 和 Spark 2.0.1

提前致谢!

最佳答案

这不是一个谜。一步一步:

  • Spark 使用 Pyrolite 在 Python 和 Java 类型之间进行转换。
  • bytes 的 Java 类型是 byte[]相当于 Array[Byte]在斯卡拉。
  • 您将列定义为 StringType因此 Array[Byte]将转换为 String在存储在 DataFrame 之前.
  • Arrays在 Scala 中是丑陋的 Java 工件,除此之外还有其他问题没有用 toString方法:

    Array(192, 168, 1, 1).map(_.toByte)

    Array[Byte] = Array(-64, -88, 1, 1)

    Array(192, 168, 1, 1).map(_.toByte).toString

    String = [B@6c9fe061

    这就是您获取列内容的方式。

  • Spark SQL 中没有直接映射到 Python 的类型 bytes .我个人会加入 join RDDs 但如果你真的想使用 DataFrames您可以使用中级 BinaryType代表。

    from collections import namedtuple

    Record = namedtuple("Record", ["url", "content"])

    rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
    df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()

    df.printSchema()

    root
    |-- url: string (nullable = true)
    |-- content: binary (nullable = true)

    它不会为您提供可本地使用的 (JVM) 或有意义的字符串表示形式:

    +-------+----------+
    | url| content|
    +-------+----------+
    |none://|[66 6F 6F]|
    |none://|[62 61 72]|
    +-------+----------+

    但无损:

    df.rdd.map(lambda row: bytes(row.content)).first()

    b'foo'

    并且可以在 Python 中访问 udf :
    from pyspark.sql.functions import udf
    from pyspark.sql import Column
    from typing import Union

    def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
    def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
    if bs is not None:
    return bytes(bs).decode(enc)
    except UnicodeDecodeError:
    pass
    return udf(decode_)(col)

    df.withColumn("decoded", decode("content")).show()

    +-------+----------+-------+
    | url| content|decoded|
    +-------+----------+-------+
    |none://|[66 6F 6F]| foo|
    |none://|[62 61 72]| bar|
    +-------+----------+-------+

    关于python-3.x - 如何在 Spark 数据帧中存储 Python 字节串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41363296/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com