gpt4 book ai didi

python - 如何提高数据输入管道性能?

转载 作者:行者123 更新时间:2023-12-02 09:20:48 25 4
gpt4 key购买 nike

我尝试优化我的数据输入管道。该数据集是一组 450 个 TFRecord 文件,每个文件大小约为 70MB,托管在 GCS 上。该作业使用 GCP ML Engine 执行。没有 GPU。

这是管道:

def build_dataset(file_pattern):
return tf.data.Dataset.list_files(
file_pattern
).interleave(
tf.data.TFRecordDataset,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
buffer_size=2048
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).map(
map_func=_parse_example_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).prefetch(
buffer_size=1
)

使用映射函数:

def _bit_to_float(string_batch: tf.Tensor):
return tf.reshape(tf.math.floormod(tf.dtypes.cast(tf.bitwise.right_shift(
tf.expand_dims(tf.io.decode_raw(string_batch, tf.uint8), 2),
tf.reshape(tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8), (1, 1, 8))
), tf.float32), 2), (tf.shape(string_batch)[0], -1))


def _parse_example_batch(example_batch):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(example_batch, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = _bit_to_float(samples["booleanFeatures"])
return (
tf.concat([dense_float, bits_to_float], 1),
tf.reshape(samples["label"], (-1, 1))
)

我尝试遵循 data pipeline tutorial 的最佳实践,并对我的映射函数进行矢量化(按照 mrry 的建议)。

使用此设置,虽然数据以高速下载(带宽约为 200MB/s),但 CPU 利用率不足(14%),并且训练速度非常慢(一个 epoch 超过 1 小时)。

我尝试了一些参数配置,更改了 interleave() 参数,例如 num_parallel_callscycle_lengthTFRecordDataset诸如 num_parallel_calls 之类的参数。

最快的配置使用这组参数:

  • interleave.num_parallel_calls:1
  • interleave.cycle_length:8
  • TFRecordDataset.num_parallel_calls:8

有了这个,一个 epoch 只需要大约 20 分钟即可运行。 但是,CPU 使用率仅为 50%,而带宽消耗约为 55MB/s

问题:

  1. 如何优化管道以达到 100% CPU 使用率(以及 100MB/s 的带宽消耗)?
  2. 为什么tf.data.experimental.AUTOTUNE找不到加速训练的最佳值?

善良,亚历克西斯。

<小时/>

编辑

经过一些更多的实验,我得出了以下解决方案。

  1. 如果 num_parallel_calls 大于 0,则删除已由 TFRecordDataset 处理的 interleave 步骤。
  2. 更新映射函数以仅执行 parse_exampledecode_raw,返回元组 `((, ), ())
  3. map 之后的缓存
  4. _bit_to_float 函数作为模型的组件移动

最后,这是数据管道代码:

def build_dataset(file_pattern):
return tf.data.TFRecordDataset(
tf.data.Dataset.list_files(file_pattern),
num_parallel_reads=multiprocessing.cpu_count(),
buffer_size=70*1000*1000
).shuffle(
buffer_size=2048
).map(
map_func=split,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).batch(
batch_size=2048,
drop_remainder=True,
).cache(
).repeat(
).prefetch(
buffer_size=32
)


def split(example):
preprocessed_sample_columns = {
"features": tf.io.VarLenFeature(tf.float32),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_single_example(example, preprocessed_sample_columns)
dense_float = tf.sparse.to_dense(samples["features"])
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(dense_float, bits_to_float),
tf.reshape(samples["label"], (1,))
)


def build_model(input_shape):
feature = keras.Input(shape=(N,))
bool_feature = keras.Input(shape=(M,), dtype="uint8")
one_hot = dataset._bit_to_float(bool_feature)
dense_input = tf.reshape(
keras.backend.concatenate([feature, one_hot], 1),
input_shape)
output = actual_model(dense_input)

model = keras.Model([feature, bool_feature], output)
return model

def _bit_to_float(string_batch: tf.Tensor):
return tf.dtypes.cast(tf.reshape(
tf.bitwise.bitwise_and(
tf.bitwise.right_shift(
tf.expand_dims(string_batch, 2),
tf.reshape(
tf.dtypes.cast(tf.range(7, -1, -1), tf.uint8),
(1, 1, 8)
),
),
tf.constant(0x01, dtype=tf.uint8)
),
(tf.shape(string_batch)[0], -1)
), tf.float32)

感谢所有这些优化:

  • 带宽消耗约为 90MB/s
  • CPU 使用率约为 20%
  • 第一个纪元花费 20 分钟
  • 连续的纪元每个花费 5 分钟

所以这似乎是一个很好的第一个设置。但CPU和BW仍然没有被过度使用,所以仍然欢迎任何建议!

<小时/>

编辑之二

因此,经过一些基准测试后,我发现了我认为最好的输入管道:

def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)

def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)

那么,新内容:

  • 据此GitHub issueTFRecordDataset 交错是一种传统交错,因此 interleave 功能更好。
  • batchmap 之前是一个好习惯 ( vectorizing your function ),可以减少调用映射函数的次数。
  • 不再需要重复。从 TF2.0 开始,Keras 模型 API 支持数据集 API,并且可以使用缓存(参见SO post)
  • VarLenFeature 切换到 FixedLenSequenceFeature,删除对 tf.sparse.to_dense 的无用调用。

希望这能有所帮助。仍然欢迎建议。

最佳答案

在答案部分提及@AlexisBRENON 的解决方案和重要观察结果,以造福社区。

下面提到的是重要的观察结果:

  1. 据此GitHub issueTFRecordDataset interleaving是遗留的,所以interleave功能更好。
  2. batch之前map是一个好习惯( vectorizing your function )并减少调用映射函数的次数。
  3. 不需要repeat不再了。从 TF2.0 开始,Keras 模型 API 支持数据集 API,并且可以使用缓存(参见SO post)
  4. VarLenFeature切换到 FixedLenSequenceFeature ,删除对 tf.sparse.to_dense 的无用调用.

下面提到了根据上述观察结果改进了性能的管道代码:

def build_dataset(file_pattern):
tf.data.Dataset.list_files(
file_pattern
).interleave(
TFRecordDataset,
cycle_length=tf.data.experimental.AUTOTUNE,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).shuffle(
2048
).batch(
batch_size=64,
drop_remainder=True,
).map(
map_func=parse_examples_batch,
num_parallel_calls=tf.data.experimental.AUTOTUNE
).cache(
).prefetch(
tf.data.experimental.AUTOTUNE
)

def parse_examples_batch(examples):
preprocessed_sample_columns = {
"features": tf.io.FixedLenSequenceFeature((), tf.float32, allow_missing=True),
"booleanFeatures": tf.io.FixedLenFeature((), tf.string, ""),
"label": tf.io.FixedLenFeature((), tf.float32, -1)
}
samples = tf.io.parse_example(examples, preprocessed_sample_columns)
bits_to_float = tf.io.decode_raw(samples["booleanFeatures"], tf.uint8)
return (
(samples['features'], bits_to_float),
tf.expand_dims(samples["label"], 1)
)

关于python - 如何提高数据输入管道性能?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58014123/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com