gpt4 book ai didi

python - 带有二进制列的 Spark/PySpark collect_set

转载 作者:太空宇宙 更新时间:2023-11-04 04:15:46 25 4
gpt4 key购买 nike

一些测试数据,有两列:第一列二进制(在此示例中使用字母数字字节),第二列是整数:

from pyspark.sql.types import *
from pyspark.sql import functions as F

df = spark.createDataFrame([
(bytearray(b'0001'), 1),
(bytearray(b'0001'), 1),
(bytearray(b'0001'), 2),
(bytearray(b'0002'), 2)
],
schema=StructType([
StructField("bin", BinaryType()),
StructField("number", IntegerType())
]))

使用 collect_set 按整数列分组然后删除重复项不起作用,因为字节数组不支持散列。因此:

(
df
.groupBy('number')
.agg(F.collect_set("bin").alias('bin_array'))
.show()
)

+------+------------+
|number| bin_array|
+------+------------+
| 1|[0001, 0001]|
| 2|[0001, 0002]|
+------+------------+

一个 hacky 选项是将二进制数组嵌入到一个结构中,然后再将它们全部解包,但我怀疑这将导致大量分配并且非常昂贵(虽然还没有实际分析它):

def unstruct_array(input):
return [x.bin for x in input]

unstruct_array_udf = F.udf(unstruct_array, ArrayType(BinaryType()))

(
df
.withColumn("bin", F.struct("bin"))
.groupBy('number')
.agg(F.collect_set("bin").alias('bin_array'))
.withColumn('bin_array', unstruct_array_udf('bin_array'))
.show()
)

+------+------------+
|number| bin_array|
+------+------------+
| 1| [0001]|
| 2|[0001, 0002]|
+------+------------+

如果我围绕二进制类型和 Spark 尝试了很多 Google 搜索词,就会有各种答案说如果需要散列,就应该包装数组。建议包括自定义包装器或通过调用 Scala 的 toSeq 来创建 Scala WrappedArray。例如:

ReduceByKey with a byte array as the key

How to use byte array as key in RDD?

因此,选项包括:

  1. 映射底层 RDD 使二进制字段成为 WrappedArray。不确定如何在 Python 中执行此操作?
  2. 为数组创建一个 Python 包装器,然后以某种方式在 Python 中散列底层 Java 数组?虽然不确定这比使用结构有什么优势?
  3. 我可以包装在一个结构中,然后永远不会展开,这在处理方面会更有效一些,但可能会使 parquet 文件更大,并且在所有下游任务中解析起来更昂贵

最佳答案

这里有一个 hack,它可能比包装和解包装更有效。您可以简单地预先调用 distinct 方法。

df.show()
+-------------+------+
| bin|number|
+-------------+------+
|[30 30 30 31]| 1|
|[30 30 30 31]| 1|
|[30 30 30 31]| 2|
|[30 30 30 32]| 2|
+-------------+------+

df.distinct().show()
+-------------+------+
| bin|number|
+-------------+------+
|[30 30 30 31]| 1|
|[30 30 30 31]| 2|
|[30 30 30 32]| 2|
+-------------+------+

请注意,我可能没有使用与您相同版本的 Spark(我的是​​ 2.2.1),因为二进制数组的显示似乎有所不同。

然后,对于collect_set,它简单地归结为:

df.distinct().groupBy("number").agg(F.collect_set("bin"))

关于python - 带有二进制列的 Spark/PySpark collect_set,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55463673/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com