gpt4 book ai didi

python - 启用急切执行时如何运行并行map_fn

转载 作者:行者123 更新时间:2023-11-30 22:06:16 25 4
gpt4 key购买 nike

考虑以下 tensorflow 代码片段:

import time
import numpy as np
import tensorflow as tf

def fn(i):
# do some junk work
for _ in range(100):
i ** 2
return i

n = 1000
n_jobs = 8
stuff = np.arange(1, n + 1)
eager = False
t0 = time.time()
if eager:
tf.enable_eager_execution()
res = tf.map_fn(fn, stuff, parallel_iterations=n_jobs)
if not eager:
with tf.Session() as sess:
res = sess.run(res)
print(sum(res))
else:
print(sum(res))
dt = time.time() - t0
print("(eager=%s) Took %ims" % (eager, dt * 1000))

如果使用 eager = True 运行,则比使用 eager = False 运行时慢 10 倍。我做了一些打印,发现在 eager = True 模式下,map_fn 调用是顺序运行的,而不是生成 8 个并行线程。

问题

所以我的问题是如何在急切执行模式下使用map_fn(parallel_iterations > 1)?

最佳答案

(我为此使用了 TF 2.3,不要期望新版本会出现相同的结果。)

这不仅仅是OP问题的答案,而是它的延伸,说明了为什么其他答案没有解决真正的问题,因为tf.function不足以强制并行。

<小时/>

首先,使用tf.function不强制并行化。它强制跟踪,并构建图表,这种情况只发生一次,所以,time.sleep()其他答案中使用的仅在第一次需要跟踪时运行,这就是为什么您会看到 tf.function 加速的原因。但是更改 parallel_iterations 时仍然看不到差异。 .

让我们使用py_fuction查看差异:

def op(x):
time.sleep(1)
return 2 * x.numpy()

def op_tf(x):
print('Tracing')
return tf.py_function(op, [x], Tout=tf.int32)

不使用装饰器(或直接调用)tf.function任何电话 op_tf将始终打印“Tracing”(尽管在本例中不是跟踪)

In [57]: op_tf(1)
Tracing
Out[57]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [58]: op_tf(1)
Tracing
Out[58]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

tf.function我们只看到一次跟踪(如果我们使用相同的参数):

In [67]: @tf.function
...: def op_tf(x):
...: print("Tracing")
...: return tf.py_function(op, [x], Tout=tf.int32)
...:

In [68]: op_tf(1)
Tracing
Out[68]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [69]: op_tf(2)
Tracing
Out[69]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

In [70]: op_tf(3)
Tracing
Out[70]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

In [71]: op_tf(3)
Out[71]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

发生这种情况是因为函数必须为每个新参数构建一个新图,如果我们直接传递签名,我们就可以避免这种情况的发生:

In [73]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
...: def op_tf(x):
...: print("Tracing")
...: return tf.py_function(op, [x], Tout=tf.int32)
...:
...:

In [74]: op_tf(1)
Tracing
Out[74]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [75]: op_tf(2)
Out[75]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

In [76]: op_tf(3)
Out[76]: <tf.Tensor: shape=(), dtype=int32, numpy=6>

如果我们首先调用方法get_concrete_function,也会发生同样的情况:

In [79]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
...: def op_tf(x):
...: print("Tracing")
...: return tf.py_function(op, [x], Tout=tf.int32)
...:
...:

In [80]: op_tf = op_tf.get_concrete_function()
Tracing

In [81]: op_tf(1)
Out[81]: <tf.Tensor: shape=(), dtype=int32, numpy=2>

In [82]: op_tf(2)
Out[82]: <tf.Tensor: shape=(), dtype=int32, numpy=4>

然后,答案声称只需添加 tf.function足以获得并行执行并不完全正确:

In [84]: def op(x):
...: print("sleep")
...: time.sleep(0.1)
...: return 1.
...:

In [85]: x = tf.ones(shape=(10,))

In [86]: _ = tf.map_fn(op, x, parallel_iterations=10)
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep

In [87]: @tf.function
...: def my_map(*args, **kwargs):
...: return tf.map_fn(*args, **kwargs)
...:

In [88]: my_map(op, x, parallel_iterations=10)
sleep
Out[88]: <tf.Tensor: shape=(10,), dtype=float32, numpy=array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.], dtype=float32)>

相比之下,如果用于 sleep 和 print 的 python 指令位于 py_function 内部,则它们将始终被调用:

In [96]: x = tf.ones(shape=(10,), dtype=tf.int32)

In [97]: def op(x):
...: print("sleep")
...: time.sleep(0.1)
...: return 2 * x.numpy()
...:

In [98]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
...: def op_tf(x):
...: print("Tracing")
...: return tf.py_function(op, [x], Tout=tf.int32)
...:

In [99]: _ = my_map(op_tf, x, parallel_iterations=1)
Tracing
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep
sleep

现在,我们已经清楚函数的跟踪给我们带来了一些困惑,让我们删除打印来计时调用:

In [106]: def op(x):
...: time.sleep(0.1)
...: return 2 * x.numpy()
...:

In [107]: @tf.function(input_signature=[tf.TensorSpec(shape=[], dtype=tf.int32)])
...: def op_tf(x):
...: return tf.py_function(op, [x], Tout=tf.int32)
...:

In [108]: %timeit tf.map_fn(op_tf, x, parallel_iterations=1)
1.02 s ± 554 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [109]: %timeit tf.map_fn(op_tf, x, parallel_iterations=10)
1.03 s ± 509 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

运行以下脚本并使用张量板,我们可以准确地看到发生了什么:

import tensorflow as tf
import time
from datetime import datetime

stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = 'logs/func/%s' % stamp

# Start tracing.
options = tf.profiler.experimental.ProfilerOptions(
host_tracer_level=3, python_tracer_level=1, device_tracer_level=1, delay_ms=None
)

tf.profiler.experimental.start(logdir, options = options)

def op(x):
x = x.numpy()
start = time.time()

while time.time() < start + x / 100:
x = (2 * x) % 123

return x

@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])
def op_tf(x):
return tf.py_function(op, [x], Tout=tf.int32, name='op')

@tf.function(input_signature=[tf.TensorSpec([None], tf.int32)])
def my_map(x):
return tf.map_fn(op_tf, x, parallel_iterations=16)

x = tf.ones(100, tf.int32)
print(my_map(x))

tf.profiler.experimental.stop()

我们在 Tensorboard 中得到以下内容: enter image description here

py_function有效地使用多个线程,但不是并行的。与parallel_iterations=1我们得到类似的东西 enter image description here

如果我们在脚本的开头添加以下内容

tf.config.threading.set_inter_op_parallelism_threads(1)
tf.config.threading.set_intra_op_parallelism_threads(1)

我们强制 TF 使用单个线程来进行所有图计算: enter image description here

因此,此时如果我们正确设置内部/内部线程,我们就只能获得某种形式的并行执行。

如果我们完全禁用急切执行:

import time
from datetime import datetime
import numpy as np
import tensorflow as tf

tf.compat.v1.disable_eager_execution()
tf.config.threading.set_inter_op_parallelism_threads(128)
tf.config.threading.set_intra_op_parallelism_threads(128)

stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
logdir = f'logs/func/{stamp}'

tf.profiler.experimental.start(logdir)

def op(x):
x = x.numpy()
start = time.time()
while time.time() < start + x / 100:
x = (2 * x) % 123
return x

@tf.function(input_signature=[tf.TensorSpec([], tf.int32)])
def op_tf(x):
return tf.py_function(op, [x], Tout=tf.int32, name='op')

# Create a placeholder.
x = tf.compat.v1.placeholder(tf.int32, shape=[None])

with tf.compat.v1.Session() as sess:

writer = tf.summary.create_file_writer(logdir)

#tf.profiler.experimental.start(logdir, options = options)
tf.summary.trace_on(graph=True, profiler=True)

print(
sess.run(
[tf.map_fn(op_tf, x, parallel_iterations=16)],
feed_dict={
x: np.ones(4, dtype=np.int)
}
)
)

tf.profiler.experimental.stop()

我们现在可以在 Tensorboard 中看到并行执行: enter image description here

如果我们将线程内/线程间和parallel_iterations设置为1,我们就会得到之前的行为: enter image description here

我希望这有助于澄清 tf.function 的作用检查完全并行性。

关于python - 启用急切执行时如何运行并行map_fn,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52774351/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com