gpt4 book ai didi

python - 迭代多个查询并将其存储在 pyspark 数据框中

转载 作者:行者123 更新时间:2023-12-05 04:26:41 26 4
gpt4 key购买 nike

我在 hive 中有一个表,我想在循环中的条件下查询它并将结果动态存储在多个 pyspark 数据帧中。

基本查询

g1 = """
select * from db.hive_table where group = 1
"""

group_1 = spk.sql(g1)
group_1.show(3)
group_1.printSchema()
print((group_1.count(), len(group_1.columns)))
group_1 = group_1.toPandas()

总共有 80 个组,目前分别为 Group = 2、Group = 3 等运行上述代码。

我无用的迭代代码

    # changes the geometry type to obj

df_list=[group_1,group_2,group_3,group_4,group_5,group_6,group_7,group_8,group_9,group_10,
group_11,group_12,group_13,group_14,group_15,group_16,group_17,group_18,group_19,group_20,
group_21,group_22,group_23,group_24,group_25,group_26,group_27,group_28,group_29,group_30,
group_31,group_32,group_33,group_34,group_35,group_36,group_37,group_38,group_39,group_40,
group_41,group_42,group_43,group_44,group_45,group_46,group_47,group_48,group_49,group_50,
group_51,group_52,group_53,group_54,group_55,group_56,group_57,group_58,group_59,group_60,
group_61,group_62,group_63,group_64,group_65,group_66,group_67,group_68,group_69,group_70,
group_71,group_72,group_73,group_74,group_75,group_76,group_77,group_78,group_79,group_80,

# num_list=[1,2,3,4,5,5,6,6]

for d in df_list:
for i in range(1,80):
gi = """
select * from db.hive_table where group = $i
"""

group_i = spk.sql(gi)
print(group_i.show(3))
print(group_i.printSchema())
print((group_i.count(), len(group_i.columns)))
return group_i = group_i.toPandas()

请求帮助指导我解决这个问题并帮助我增加编码知识。

提前致谢。

最佳答案

使用列表

python/pyspark 不允许您动态创建变量名。但是,您可以创建一个数据帧列表,可以像 sdf_list[0].show()sdf_list[1].toPandas() 一样使用。

sdf_list = []

for i in range(1, 81):
filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
sdf_list.append((i, filtered_sdf)) # (<filter/group identifier>, <spark dataframe>)
del filtered_sdf

现在,sdf_list 有一个可以使用列表索引访问的 spark 数据帧列表。例如,可以使用 [0] 访问第一个数据帧,打印将验证它是一个数据帧。

print(sdf_list[0])
# (1, DataFrame[col1: bigint, dt: date, col3: bigint])
# (<filter/group identifier>, <spark dataframe>)

列表可以迭代,其中的所有数据框都可以单独使用。例如,

for (i, sdf) in sdf_list[:2]:
print("dataframe {0}'s count:".format(i), sdf.count())

# dataframe 1's count: 20
# dataframe 2's count: 30

请随意使用它。

sdf_list[0][1].count()  # [0] returns the tuple - (<sdf identifier>, <sdf>)
# 20

sdf_list[0][1].show(2)
# etc...

假设您还希望所有 spark 数据帧都作为 pandas 数据帧。如果您希望它是动态的,您将再次需要创建一个数据框列表。或者只是使用索引访问 spark 数据帧。

# using indices
group1_pdf = sdf_list[0][1].toPandas()

# creating list of pandas dataframes
pdf_list = []

for (i, sdf) in sdf_list:
pdf_list.append((i, sdf.toPandas())) # (<filter/group identifier>, <pandas dataframe>)

type(pdf_list)
# list

type(pdf_list[0])
# tuple

type(pdf_list[0][1])
# pandas.core.frame.DataFrame

使用字典

我们还可以使用字典来存储数据帧并使用键跟踪它。因此,键可以充当数据框名称。

sdf_dict = {}

for i in range(1, 81):
filtered_sdf = spark.sql('select * from hive_db.hive_tbl where group = {0}'.format(i))
sdf_dict['group'+str(i)] = filtered_sdf
del filtered_sdf

字典将包含可以使用键访问的数据框。让我们简单地打印前 2 个键并检查我们有哪些值。

list(sdf_dict.keys())[:2]
# ['group1', 'group2']

sdf_dict['group1']
# DataFrame[col1: bigint, dt: date, col3: bigint]

sdf_dict['group1'].count()
# 20

您可以选择迭代字典键并使用 spark 数据帧。

for sdf_key in list(sdf_dict.keys())[:2]:
print(sdf_key+"'s record count:", sdf_dict[sdf_key].count())

# group1's record count: 20
# group2's record count: 30

您可以检查 type() 以获得更好的理解。

type(sdf_dict)
# dict

type(sdf_dict['group1'])
# pyspark.sql.dataframe.DataFrame

转换为 pandas 数据框很简单

# single df manually
group1_pdf = sdf_dict['group1'].toPandas()

# with iteration
pdf_dict = {}

for sdf_key in sdf_dict.keys():
pdf_dict[sdf_key] = sdf_dict[sdf_key].toPandas()

type(pdf_dict)
# dict

type(pdf_dict['group1'])
# pandas.core.frame.DataFrame

关于python - 迭代多个查询并将其存储在 pyspark 数据框中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72983940/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com