gpt4 book ai didi

python - 如何使用 PySpark HashPartitioner 检测大型 json 文件中的重复项

转载 作者:行者123 更新时间:2023-11-28 16:58:45 31 4
gpt4 key购买 nike

我有一个包含超过 20GB json 结构元数据的大型 json 文件。它包含跨某些应用程序的简单用户元数据,我想筛选它以检测重复项。以下是数据的示例:

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

json 文件逐行包含看起来与此非常相似的 json 对象。 "name" 时出现重复两个 json 对象的字段相同。所以,这是一个副本:

{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

只要两个完全相同的json对象。

现在,我想遍历一个太大而无法放入内存的整个 json 文件,并通过使用最佳标准找出所有重复项以及它们的重复项 和然后做一些逻辑 - 逻辑部分很简单,但我有点不确定如何找到重复项。

我的想法:

  1. 我首先考虑使用布隆过滤器。它们并没有那么令人困惑,并且工作得很好而且速度很快,而且我认为它们本质上可以归结为 O(n)。但是,布隆过滤器不会让我知道重复的字符串是什么的重复项,这对我来说是不可能的。

  2. 我考虑过使用外部归并排序。我基本上会将文件分成多个适合内存的较小文件,对每个 block 进行排序并搜索重复项(现在聚集在一起)。但我不太确定这个实现是我想要的。

  3. 我遇到的下一件事是按分区散列,我怀疑这正是我想要的。在处理适合内存的数据时,散列本质上是查找重复项的最佳方式,那么为什么不将它用于不适合内存的数据呢?我对如何按分区散列有点困惑。我不确定这是否是我要找的。

所以,我认为我应该使用选项 3,按分区散列,而且我知道 Spark 有这个功能。我希望是否有人可以让我知道我是否在正确的轨道上,并且可能会给我一些指示,说明我是否正确。从概念上讲,我有几个具体问题:

  1. 假设我创建了 100 个完全适合内存的分区(因此,在我的例子中,每个分区为 100MB)。假设我对第一个 x 进行哈希处理我的 json 文件中的元素合并到一个分区中,我发现没有重复项。假设我有另一个分区,其中包含第二个 100MB 的数据,其中也不包含重复项。如果我一次只能加载 100MB 的数据,我如何检查分区 1 和分区 2 之间没有任何重复?澄清一下,如果分区 1 有一个元素而分区 2 有一个相同的元素,我该如何计算出来?我想我需要将两者都加载到内存中,对吗?如果我做不到……那我该怎么办?可能是我理解错了...

  2. 这引出了我的第二个问题 - 这似乎不是分区的工作方式,当您按分区散列时,具有相似散列或散列范围的元素会进入特定文件。因此,如果两个元素重复,我会知道,因为算法会尝试将其放入散列已存在的文件中。是这样吗?

我知道我还有更多问题,只是想不起来了。有没有人有任何提示?特别是关于 pyspark 以及如何最好地使用它?或者 pyspark 不是我要找的东西?

最佳答案

这个问题比你想象的要简单。您实际上只需要按照@Hitobat 的建议按 name 聚合数据。我会用 pyspark.sql.Window 来解决这个问题,以简化聚合输出。

给定以下数据是一个名为 data.json 的文件(这也可以是一个文件目录,而不是单个文件)

data.json的内容

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}

那么 pyspark 代码将如下所示:

from pyspark.sql import Window
from pyspark.sql import functions as F

df = spark.read.json("data.json") # can be a directory of files as well
df.show()

输出

+----------+----------+---------------+--------+----------------+
| created|created_at| name| type| username|
+----------+----------+---------------+--------+----------------+
|2015-08-04|2010-03-15| null| null|koleslawrulez333|
|2016-01-19|2012-05-25| arthurking231| null| null|
|2016-07-23|2011-08-27| starklord1943|Username| null|
|2015-11-08|2010-01-19|Assasinator5827| null| null|
|2016-07-23|2011-08-27|Assasinator5827|Username| null|
+----------+----------+---------------+--------+----------------+

然后用pyspark.sql.Window进行分区和计数

name_partition_window = Window.partitionBy("name")
df_with_repeat_counts = df.select("*", F.count("*").over(name_partition_window).alias("name_counts"))
df_with_repeat_counts.show()

输出

+----------+----------+---------------+--------+----------------+-----------+
| created|created_at| name| type| username|name_counts|
+----------+----------+---------------+--------+----------------+-----------+
|2016-01-19|2012-05-25| arthurking231| null| null| 1|
|2015-08-04|2010-03-15| null| null|koleslawrulez333| 1|
|2015-11-08|2010-01-19|Assasinator5827| null| null| 2|
|2016-07-23|2011-08-27|Assasinator5827|Username| null| 2|
|2016-07-23|2011-08-27| starklord1943|Username| null| 1|
+----------+----------+---------------+--------+----------------+-----------+

然后在name_count列过滤dataframe,并按name排序检查

duplicates = df_with_repeat_counts.where(F.col("name_counts") > 1).orderBy("name")
duplicates.show()

输出

+----------+----------+---------------+--------+--------+-----------+
| created|created_at| name| type|username|name_counts|
+----------+----------+---------------+--------+--------+-----------+
|2015-11-08|2010-01-19|Assasinator5827| null| null| 2|
|2016-07-23|2011-08-27|Assasinator5827|Username| null| 2|
+----------+----------+---------------+--------+--------+-----------+

此时,您可以根据用例的需要分析duplicates 数据框。

关于python - 如何使用 PySpark HashPartitioner 检测大型 json 文件中的重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55857418/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com