gpt4 book ai didi

tensorflow - 使用 shuffled tf.data : if training is interrupted, 进行 Keras 训练如何在最后一次数据迭代/上次保存检查点的顺序上继续训练

转载 作者:行者123 更新时间:2023-12-04 09:36:56 26 4
gpt4 key购买 nike

我正在用 keras 训练 model.fit ,数据来自 tf.records,加载到 tf.data 对象中,该对象使用 .shuffle打乱数据。我也在用 callbacks.ModelCheckpoint每隔 x 保存模型步数/批次。
有时我的云实例在一个纪元完成之前断开连接或崩溃,但模型在 y步骤已保存到我的驱动器中。
我想在训练另一个时期之前完成对该时期数据的训练(我有很长的时期),因此每个时期每个数据示例都训练一次。
有没有办法获得数据的原始顺序,以及数据中上次保存模型的位置?
到目前为止我发现了什么
看起来您可以通过设置种子在 .shuffle 中设置特定顺序。但是,改组只发生在缓冲区中,所以我不能 100% 确定设置种子是否会完美地重现顺序。另外,我不确定这将如何与 reshuffle_each_iteration 一起使用.每个时期后是否使用不同的种子?如果是这样,我想解决方法是一次只训练 1 个 epoch,每个 epoch 都有一个指定的种子。
即使我确实得到了训练顺序的副本,我也不确定如何找到模型上次保存的顺序,然后从该点开始训练。我必须得到订单的一个想法是手动遍历数据集,直到我到达它为止。虽然我不确定 model.fit()将从这个顺序继续,或者从头开始。 F
为了从上次保存模型的位置获取步骤/批次编号,我可能可以将其记录在某处。
这些解决方案似乎是粗略的解决方法,我想知道 Keras 中是否有一些我可能会忽略的功能来帮助解决这个问题。

最佳答案

似乎没有 keras 构建方式来做到这一点,但如果我错了,请纠正我。
我的方法Dataset.shufflereshuffle_each_iteration=True 时,内部使用初始种子值生成用于在迭代期间重新洗牌的种子。 .因此,为特定时期重新创建相同的顺序并在该特定批次继续训练时期,我们必须使用相同的种子重新创建数据集并将数据集迭代器移动到相同时期和相同批次。
调试
为了调试并确保以相同的顺序生成纪元和批次,我们需要一种方法来打印每个纪元批次中数据点的提取方式。这在 kears 中很棘手,因此出于调试目的,我将使用回归问题并将基本事实作为序列号。然后我可以有一个自定义损失,我可以打印基本事实并使用户订单正确。
模型和数据

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import keras.backend as K


# Data
x_train = np.random.randn(15, 10).astype("float32")
y_train = np.arange(15).astype("float32")

# Custom MSE looss just to track the order in which data is picked up
def my_mse(y_true, y_pred):
tf.print(tf.keras.backend.flatten(y_true))
loss = K.square(y_pred - y_true)
loss = K.sum(loss, axis=1)
return loss

# Model
def get_model():
inputs = keras.Input(shape=(10))
outputs = layers.Dense(1, activation="linear")(inputs)
model = keras.Model(inputs=inputs, outputs=outputs)

model.compile(
optimizer="rmsprop",
loss=my_mse,
)
return model
数据集
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)

epochs = 2

print ("Runs 1")
for e in range(epochs):
for i, (x, y) in enumerate(train_dataset):
print (e, i, y)

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(8)
print ("Runs 2")
for e in range(epochs):
for i, (x, y) in enumerate(train_dataset):
print (e, i, y)
输出:
Runs 1
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14. 9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10. 0. 14. 6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0. 1. 5. 6. 9. 3. 7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13. 8. 4. 10. 2. 12. 11.], shape=(7,), dtype=float32)
Runs 2
0 tf.Tensor([1. 3. 5. 7. 4. 0. 8. 2.], shape=(8,), dtype=float32)
1 tf.Tensor([ 6. 11. 10. 14. 9. 12. 13.], shape=(7,), dtype=float32)
2 tf.Tensor([4. 2. 5. 8. 1. 9. 7. 3.], shape=(8,), dtype=float32)
3 tf.Tensor([13. 10. 0. 14. 6. 11. 12.], shape=(7,), dtype=float32)
4 tf.Tensor([ 0. 1. 5. 6. 9. 3. 7. 14.], shape=(8,), dtype=float32)
5 tf.Tensor([13. 8. 4. 10. 2. 12. 11.], shape=(7,), dtype=float32)
是的,使用种子复制订单。
现在让我们写一个方法将数据集转发到某个纪元和批次组合
def forward(dataset, n=None):
if not n:
return dataset

i = 0
while True:
for _ in dataset:
i += 1
if i == n:
return dataset
测试用例:
让我们正常运行并观察顺序
一开始的数据
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)

model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)
输出:
[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
来自 Dataset 第 n 个状态的数据
让我们的数据集进行第 4 次迭代并运行训练
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)

model = get_model()
model.fit(train_dataset, epochs=3, verbose=0, workers=4, shuffle=False)
输出:
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
很好,现在我们知道如何正确转发数据集了。现在让我们编写回调来跟踪当前的迭代次数:
跟踪迭代的自定义回调(epoch-batch 组合)
现在我们需要确定模型检查指向的纪元和批次组合。如果我们有这些信息,我们可以加载最后一个检查点模型并将我们的数据集转发到它的批次和纪元组合并继续训练。我们将使用回调来做到这一点
class MyCustomCallback(tf.keras.callbacks.ModelCheckpoint, keras.callbacks.Callback):
def __init__(self, the_id=0, **args):
self.the_id = the_id
self.epoch = 0
super().__init__(**args)

def _save_model(self, epoch, logs):
logs['the_id'] = self.the_id
super()._save_model(epoch, logs)

def on_batch_end(self, batch, logs={}):
self.the_id += 1
super().on_batch_end(batch, logs)

checkpoint_filepath = 'checkpoint-{the_id}'
model_checkpoint_callback = MyCustomCallback(
filepath=checkpoint_filepath,
save_freq=2,
save_best_only=False)

model = get_model()

train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), None)

model.fit(train_dataset, epochs=5, verbose=0, callbacks=[model_checkpoint_callback], workers=4, shuffle=False)
输出:
[7 3 6 10]
[11 0 1 2]
[8 14 9 13]
[12 5 4]
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]
我们每两批检查一次。所以让我们假设它崩溃了,最后一个检查点是 checkpoint-4 .我们可以加载这个模型并将我们的数据集转发到 4 并继续训练。
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = forward(train_dataset.shuffle(buffer_size=8, reshuffle_each_iteration=True, seed=0).batch(4), 4)

model = get_model()
model.fit(train_dataset, epochs=2, verbose=0, workers=4, shuffle=False)
输出:
[5 8 6 3]
[1 12 10 9]
[2 11 0 4]
[14 13 7]
[2 3 0 10]
[4 1 13 6]
[8 7 14 11]
[12 5 9]

关于tensorflow - 使用 shuffled tf.data : if training is interrupted, 进行 Keras 训练如何在最后一次数据迭代/上次保存检查点的顺序上继续训练,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62528853/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com