gpt4 book ai didi

python - 如何使用 pyspark 从 Spark 中批量获取行

转载 作者:行者123 更新时间:2023-12-04 13:15:44 26 4
gpt4 key购买 nike

我有一个包含超过 60 亿行数据的 Spark RDD,我想使用 train_on_batch 来训练深度学习模型。我无法将所有行都放入内存中,所以我想一次获得 10K 左右的数据以批量处理成 64 或 128 块(取决于模型大小)。我目前正在使用 rdd.sample() 但我认为这不能保证我会获得所有行。是否有更好的方法来分区数据以使其更易于管理,以便我可以编写生成器函数来获取批次?我的代码如下:

data_df = spark.read.parquet(PARQUET_FILE)
print(f'RDD Count: {data_df.count()}') # 6B+
data_sample = data_df.sample(True, 0.0000015).take(6400)
sample_df = data_sample.toPandas()

def get_batch():
for row in sample_df.itertuples():
# TODO: put together a batch size of BATCH_SIZE
yield row

for i in range(10):
print(next(get_batch()))

最佳答案

我不相信 spark 让您对数据进行偏移或分页。

但是您可以添加一个索引,然后对其进行分页,首先:

    from pyspark.sql.functions import lit
data_df = spark.read.parquet(PARQUET_FILE)
count = data_df.count()
chunk_size = 10000

# Just adding a column for the ids
df_new_schema = data_df.withColumn('pres_id', lit(1))

# Adding the ids to the rdd
rdd_with_index = data_df.rdd.zipWithIndex().map(lambda (row,rowId): (list(row) + [rowId+1]))

# Creating a dataframe with index
df_with_index = spark.createDataFrame(chunk_rdd,schema=df_new_schema.schema)

# Iterating into the chunks
for chunk_size in range(0,count+1 ,chunk_size):
initial_page = page_num*chunk_size
final_page = initial_page + chunk_size
where_query = ('pres_id > {0} and pres_id <= {1}').format(initial_page,final_page)
chunk_df = df_with_index.where(where_query).toPandas()
train_on_batch(chunk_df) # <== Your function here

这不是最佳选择,由于使用了 Pandas 数据框,它会严重利用 spark,但可以解决您的问题。

如果这会影响您的功能,请不要忘记删除 id。

关于python - 如何使用 pyspark 从 Spark 中批量获取行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60645256/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com