gpt4 book ai didi

pyspark - 使用自定义分区器在 Pyspark 中对数据帧进行分区

转载 作者:行者123 更新时间:2023-12-01 11:15:13 26 4
gpt4 key购买 nike

寻找有关在 Pyspark 中使用自定义分区器的一些信息。我有一个包含各个国家/地区的国家/地区数据的数据框。因此,如果我对国家/地区列进行重新分区,它会将我的数据分布到 n 个分区中,并将类似的国家/地区数据保留到特定分区。当我看到使用 glom() 时,这是在创建偏斜分区数据方法。

美国和中国等一些国家/地区在特定数据帧中拥有大量数据。我想重新分区我的数据帧,如果国家是美国和中国,那么它将进一步分成大约 10 个分区,其他国家的分区保持不变,如 IND、THA、AUS 等。我们可以在 Pyspark 代码中扩展分区器类吗?

我在下面的链接中读到了这个,我们可以在 scala Spark 应用程序中扩展 scala partitioner 类,并且可以修改 partitioner 类以使用自定义逻辑根据需求重新分区我们的数据。就像我所拥有的.. 请帮助在 Pyspark 中实现此解决方案.. 请参阅下面的链接 What is an efficient way to partition by column but maintain a fixed partition count?

我使用的是 Spark 版本 2.3.0.2,以下是我的数据帧结构:

datadf= spark.sql("""
SELECT
ID_NUMBER ,SENDER_NAME ,SENDER_ADDRESS ,REGION_CODE ,COUNTRY_CODE
from udb.sometable
""");

传入的数据有六个国家的数据,如 AUS , IND , THA , RUS , CHNUSA . CHNUSA有偏斜数据。

所以如果我这样做 repartitionCOUNTRY_CODE ,两个分区包含大量数据,而其他分区很好。我使用 glom() 检查了这个方法。
newdf = datadf.repartition("COUNTRY_CODE")

from pyspark.sql import SparkSession
from pyspark.sql import HiveContext, DataFrameWriter, DataFrame

newDF = datadf.repartitionByRange(3,"COUNTRY_CODE","USA")

我正在尝试将我的数据重新分区为国家/地区的另外 3 个分区 USACHN只希望将其他国家的数据保留在单个分区中。
This is what I am expecting 
AUS- one partition
IND- one partition
THA- one partition
RUS- one partition
CHN- three partition
USA- three partition

Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1182, in getattr "'%s' object has no attribute '%s'" % (self.class.name, name)) AttributeError: 'DataFrame' object has no attribute 'repartitionByRange'

最佳答案

用散列尝试这样的事情:

newDf = oldDf.repartition(N, $"col1", $"coln")

或测距方法:
newDF = oldDF.repartitionByRange(N, $"col1", $"coln")

目前还没有用于 DF 的自定义分区。

在你的情况下,我会去散列,但没有保证。

但是如果您的数据有偏差,您可能需要一些额外的工作,例如 2 列用于分区是最简单的方法。

例如。现有或新列 - 在这种情况下是对给定国家/地区应用分组的列,例如1 .. N,以及两列上的分区。

对于有很多分组的国家,你会得到 N 个合成子部门;对于基数较低的其他人,只有1个这样的组号。不是太难。两个分区都可以占用 1 个以上的列。

在我看来,统一数量的分区填充需要很多努力并且不是真正可以实现的,但是像这里这样的下一个最佳方法就足够了。在一定程度上相当于自定义分区。

否则,在 DF 上使用 .withColumn 您可以使用这些规则模拟自定义分区并填充新的 DF 列,然后应用 repartitionByRange。也没有那么难。

关于pyspark - 使用自定义分区器在 Pyspark 中对数据帧进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52790703/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com