- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
为了减少分布式训练的同步次数,我想先做梯度的局部累加。这就像您可以拥有多个 GPU,但串行而非并行。
我想在带有分布式策略的 estimator.train 循环中使用它,例如镜像和集体 allreduce 等。
这是我的实现,请给我一些输入:)
首先因为我需要在 session.run() 中运行不同的图形,所以我修改了 estimator.EstimatorSpec 以进行更多操作。其次,似乎没有明确的方法可以在分布式策略环境中的本地 GPU 中创建本地非共享变量。我不得不破解一些 variable_create_scope。
这里是 hacked variable_creator 函数,
def skip_all_scope_variable_creator(next_creator=None, on_device=None, **kwargs):
#print("skip_all_scope_variable_creator:[{}]".format(kwargs))
initial_value = kwargs.get("initial_value", None)
trainable = kwargs.get("trainable", None)
collections = kwargs.get("collections", None)
validate_shape = kwargs.get("validate_shape", True)
caching_device = kwargs.get("caching_device", None)
name = kwargs.get("name", None)
variable_def = kwargs.get("variable_def", None)
dtype = kwargs.get("dtype", None)
expected_shape = kwargs.get("expected_shape", None)
import_scope = kwargs.get("import_scope", None)
constraint = kwargs.get("constraint", None)
use_resource = kwargs.get("use_resource", None)
with tf.device(on_device) :
return resource_variable_ops.ResourceVariable(
initial_value=initial_value, trainable=trainable,
collections=collections, validate_shape=validate_shape,
caching_device=caching_device, name=name, dtype=dtype,
constraint=constraint, variable_def=variable_def,
import_scope=import_scope)
这是我在 model_fn() 中创建三个操作的代码,
loss = loss_from_model
optimizer = some_optimizer
tvars = tf.trainable_variables()
gradients = optimizer.compute_gradients(
loss, tvars, colocate_gradients_with_ops=True)
accumulate_pass_num = FLAGS.pass_per_batch
if accumulate_pass_num > 1 :
accum_grads = []
accum_vars = []
reset_grad_ops = []
accum_grad_ops = []
for g,v in gradients:
accum_vars.append(v)
if g is not None:
with tf.variable_creator_scope(lambda next_creator=None, **kwargs: skip_all_scope_variable_creator(next_creator, g.device, **kwargs)):
print("create accum_grad for variable:{}".format(v.name))
tmp_grad_on_device = tf.Variable(tf.zeros_like(g), trainable=False, synchronization=tf.VariableSynchronization.ON_READ, collections=[tf.GraphKeys.LOCAL_VARIABLES], name='tmp_accum_grad')
reset_one_grad_op = tf.assign(tmp_grad_on_device, g, name="reset_accumulated_gradient_op")
reset_grad_ops.append(reset_one_grad_op)
# the return of assign_add is the value will be update
accum_grad_on_device = tmp_grad_on_device.assign_add(g, name="accumulate_gradient")
accum_grad_ops.append(accum_grad_on_device)
accum_grads.append(accum_grad_on_device)
else:
accum_grads.append(None)
accumulate_gradients_op = tf.group(*accum_grad_ops, name="grouped_accu_grad_op")
reset_gradients_op = tf.group(*reset_grad_ops, name="grouped_reset_gradients_op")
accum_grad_means = [tf.multiply(v, 1.0/accumulate_pass_num) if v is not None else None for v in accum_grads]
accum_grads_vars = zip(accum_grad_means, accum_vars)
minimize_op = optimizer.apply_gradients(
accum_grads_vars, global_step=global_step, name="train")
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
train_op = tf.group(minimize_op, update_ops)
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op, accumulate_gradients_op=accumulate_gradients_op, reset_gradients_op=reset_gradients_op, accumulate_pass_num=accumulate_pass_num)
这里修改了 estimator.train() 以运行不同的操作,
while not mon_sess.should_stop():
if estimator_spec.accumulate_pass_num > 1 :
# reset gradiends first
mon_sess.run([estimator_spec.reset_gradients_op])
for _ in range(estimator_spec.accumulate_pass_num-2):
mon_sess.run([estimator_spec.accumulate_gradients_op])
_, loss = mon_sess.run([estimator_spec.train_op, estimator_spec.loss])
我在 google 官方模型存储库中的 transformer 模型上进行了尝试。结果很好。
我的问题是,有没有更好的方法来做到这一点?
我是否应该考虑使用 tf.cond() 来选择 model_fn 中返回的操作,这样 Estimator 和 EstimatorSpec 就不需要修改了?但是好像很难:(
非常感谢!
东
最佳答案
我认为您可以通过将 train_ops 传递给估算器来实现这一点。在估算器 model_fn 中单独调用 tensorflow ops 绝对没有效果。因为按照设计,每次训练只调用一次 model_fn,因此你放入其中的每个操作也只会执行一次。除此之外,所有 tf.cond 分支都将在 model_fn 调用期间被评估和执行。(您可以通过简单的条件日志操作来验证此行为。)实现梯度累加的关键是:
那些传递给 estimator_spec 或 training_hooks 的操作可以在训练过程中动态执行。
这是我的代码,用有限的 GPU 内存微调 BERT:
# compute batch gradient
grads = tf.gradients(loss, tvars)
(grads, _) = tf.clip_by_global_norm(grads, clip_norm=1.0)
# this is a list of sum(dy/dx) for each variable that must be paired with a tvars list.
# element may be an IndexedSlices object that does not support assignning, e.g. [g.assign(value) for g in grads]
# some of the elements are None, meaning y and x does not depend on each other.
# Nonetypes must be handled using Python, tensorflow cannot convert Nonetypes to 0.
# declare a temp variable for summation
sum_gradient = [tf.get_variable(name="sum_grads" + str(i), shape=tv.shape,
initializer=tf.zeros_initializer,
trainable=False,
dtype=tf.float32,
collections=[tf.GraphKeys.LOCAL_VARIABLES]) for i, tv in enumerate(tvars)]
sum_ops = []
unused_variable_in_batch = []
# gradient accumulation
for i, gv in enumerate(grads):
if gv is not None:
sum_ops.append(sum_gradient[i].assign_add(gv, name="accumulate_gradient"))
else:
unused_variable_in_batch.append(sum_gradient[i])
sum_gradient[i] = None
# NOTE : calling .assign_add does NOTHING in estimator, must wrap them all and handle them via train_ops
def apply_accumulated_gradients(sums):
# normalize gradient
normalize_ops = []
for i, g in enumerate(sums):
if g is not None:
normalize_ops.append(sums[i].assign(tf.multiply(g, 1 / gradient_accmulation_multiplier)))
# assign to make sure it still is a variable, or else it will become a Tensor
with tf.control_dependencies(normalize_ops):
minimize_op = optimizer.apply_gradients(zip(sums, tvars), global_step=global_step)
return tf.group(minimize_op, *normalize_ops, name="apply_accumulated_gradients")
train_op = tf.cond(tf.math.equal(global_step % gradient_accmulation_multiplier, 0),
lambda: apply_accumulated_gradients(sum_gradient),
lambda: optimizer.apply_gradients(zip([None for _ in grads], tvars), global_step=global_step))
# reset accumulation when necessary
def reset():
counter = 0
for i, s in enumerate(sum_gradient):
if s is None:
# restore reference from None to the original variable
sum_gradient[i] = unused_variable_in_batch[counter]
counter += 1
return tf.group([s.assign(tf.zeros_like(s)) for s in sum_gradient])
with tf.control_dependencies([train_op]):
reset_ops = tf.cond(tf.math.equal(do_update, 1.),
reset,
tf.no_op)
# the 2 branches must have identical structure, [op1, op2, ...] || no_op cannot be valid cond branch.
# tf.group to convert all resets into 1 op and match with no_op: tf.group() || np_op
# Increment global step
new_global_step = global_step + 1
train_op = tf.group(*sum_ops, [train_op, global_step.assign(new_global_step), reset_ops])
logging_hook = tf.train.LoggingTensorHook({"accuracy": "acc"},
every_n_iter=gradient_accmulation_multiplier)
output_spec = tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=train_op,
training_hooks=[logging_hook, accumulation_hook] # wrap with a list
)
我对批量梯度应用了裁剪,并简单地取了它们的平均值。这种方法对我有用,但我建议您在数据集上密切关注损失行为。
还有,关于tf.cond(tf.math.equal(do_update, 1.),...,...),do_update是一个Hook管理的变量,每一步gradient_accmulation_multiplier都会取值1,所以这个语句与 tf.math.equal(global_step % gradient_accmulation_multiplier, 0) 具有完全相同的效果。这只是另一种方式。
Hook的代码如下:
class GradientAccumulationHook(session_run_hook.SessionRunHook):
"""
Puts a certain tf.Variable to 1 once every certain steps.
"""
def __init__(self, frequency, variable):
self._step = 0
self._flag = 0.
self._freq = frequency
self._input_placeholder = tf.placeholder(tf.float32)
self.assign_op = variable.assign(self._input_placeholder)
def begin(self):
# a hook can modify graph at begin(), after this the graph will be finalized
self._step = tf.train.get_global_step()
def before_run(self, run_context):
step = run_context.session.run(self._step) # evaluate tensor to get a step number
self._flag = 1. if step % self._freq == 0 and step != 0 else 0.
run_context.session.run(self.assign_op, feed_dict={self._input_placeholder: self._flag})
关于tensorflow - 使用分布策略在 Estimator 中累积梯度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54735106/
我需要将文本放在 中在一个 Div 中,在另一个 Div 中,在另一个 Div 中。所以这是它的样子: #document Change PIN
奇怪的事情发生了。 我有一个基本的 html 代码。 html,头部, body 。(因为我收到了一些反对票,这里是完整的代码) 这是我的CSS: html { backgroun
我正在尝试将 Assets 中的一组图像加载到 UICollectionview 中存在的 ImageView 中,但每当我运行应用程序时它都会显示错误。而且也没有显示图像。 我在ViewDidLoa
我需要根据带参数的 perl 脚本的输出更改一些环境变量。在 tcsh 中,我可以使用别名命令来评估 perl 脚本的输出。 tcsh: alias setsdk 'eval `/localhome/
我使用 Windows 身份验证创建了一个新的 Blazor(服务器端)应用程序,并使用 IIS Express 运行它。它将显示一条消息“Hello Domain\User!”来自右上方的以下 Ra
这是我的方法 void login(Event event);我想知道 Kotlin 中应该如何 最佳答案 在 Kotlin 中通配符运算符是 * 。它指示编译器它是未知的,但一旦知道,就不会有其他类
看下面的代码 for story in book if story.title.length < 140 - var story
我正在尝试用 C 语言学习字符串处理。我写了一个程序,它存储了一些音乐轨道,并帮助用户检查他/她想到的歌曲是否存在于存储的轨道中。这是通过要求用户输入一串字符来完成的。然后程序使用 strstr()
我正在学习 sscanf 并遇到如下格式字符串: sscanf("%[^:]:%[^*=]%*[*=]%n",a,b,&c); 我理解 %[^:] 部分意味着扫描直到遇到 ':' 并将其分配给 a。:
def char_check(x,y): if (str(x) in y or x.find(y) > -1) or (str(y) in x or y.find(x) > -1):
我有一种情况,我想将文本文件中的现有行包含到一个新 block 中。 line 1 line 2 line in block line 3 line 4 应该变成 line 1 line 2 line
我有一个新项目,我正在尝试设置 Django 调试工具栏。首先,我尝试了快速设置,它只涉及将 'debug_toolbar' 添加到我的已安装应用程序列表中。有了这个,当我转到我的根 URL 时,调试
在 Matlab 中,如果我有一个函数 f,例如签名是 f(a,b,c),我可以创建一个只有一个变量 b 的函数,它将使用固定的 a=a1 和 c=c1 调用 f: g = @(b) f(a1, b,
我不明白为什么 ForEach 中的元素之间有多余的垂直间距在 VStack 里面在 ScrollView 里面使用 GeometryReader 时渲染自定义水平分隔线。 Scrol
我想知道,是否有关于何时使用 session 和 cookie 的指南或最佳实践? 什么应该和什么不应该存储在其中?谢谢! 最佳答案 这些文档很好地了解了 session cookie 的安全问题以及
我在 scipy/numpy 中有一个 Nx3 矩阵,我想用它制作一个 3 维条形图,其中 X 轴和 Y 轴由矩阵的第一列和第二列的值、高度确定每个条形的 是矩阵中的第三列,条形的数量由 N 确定。
假设我用两种不同的方式初始化信号量 sem_init(&randomsem,0,1) sem_init(&randomsem,0,0) 现在, sem_wait(&randomsem) 在这两种情况下
我怀疑该值如何存储在“WORD”中,因为 PStr 包含实际输出。? 既然Pstr中存储的是小写到大写的字母,那么在printf中如何将其给出为“WORD”。有人可以吗?解释一下? #include
我有一个 3x3 数组: var my_array = [[0,1,2], [3,4,5], [6,7,8]]; 并想获得它的第一个 2
我意识到您可以使用如下方式轻松检查焦点: var hasFocus = true; $(window).blur(function(){ hasFocus = false; }); $(win
我是一名优秀的程序员,十分优秀!