gpt4 book ai didi

python - 如何在 tensorflow 中使用自定义 python 函数预取数据

转载 作者:IT老高 更新时间:2023-10-28 21:40:09 26 4
gpt4 key购买 nike

我正在尝试预取训练数据以隐藏 I/O 延迟。我想编写自定义 Python 代码,从磁盘加载数据并预处理数据(例如,通过添加上下文窗口)。换句话说,一个线程进行数据预处理,另一个线程进行训练。这在 TensorFlow 中是否可行?

更新:我有一个基于 @mrry 示例的工作示例。

import numpy as np
import tensorflow as tf
import threading

BATCH_SIZE = 5
TRAINING_ITERS = 4100

feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])

sess = tf.Session()

def load_and_enqueue(sess, enqueue_op, coord):
with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
while not coord.should_stop():
feature_array = np.fromfile(feature_file, np.float32, 128)
if feature_array.shape[0] == 0:
print('reach end of file, reset using seek(0,0)')
feature_file.seek(0,0)
label_file.seek(0,0)
continue
label_value = np.fromfile(label_file, np.float32, 128)

sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})

coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()

for i in range(TRAINING_ITERS):
sum = sess.run(c)
print('train_iter='+str(i))
print(sum)

coord.request_stop()
coord.join([t])

最佳答案

这是一个常见的用例,大多数实现都使用 TensorFlow 的 queues 将预处理代码与训练代码解耦。有a tutorial on how to use queues ,但主要步骤如下:

  1. 定义一个队列,q,它将缓冲预处理的数据。 TensorFlow 支持简单的 tf.FIFOQueue按照入队顺序生成元素,更高级的 tf.RandomShuffleQueue以随机顺序生成元素。队列元素是一个或多个张量的元组(可以有不同的类型和形状)。所有队列都支持单元素(enqueuedequeue)和批处理(enqueue_manydequeue_many)操作,但使用批处理操作,您必须在构造队列时指定队列元素中每个张量的形状。

  2. 构建一个子图,将预处理的元素排入队列。一种方法是定义一些 tf.placeholder()对应于单个输入示例的张量的操作,然后将它们传递给 q.enqueue() . (如果您的预处理一次生成一个批处理,则应使用 q.enqueue_many()。)您还可以在此子图中包含 TensorFlow 操作。

  3. 构建一个执行训练的子图。这看起来像一个常规的 TensorFlow 图,但会通过调用 q.dequeue_many(BATCH_SIZE) 来获取它的输入。 .

  4. 开始您的 session 。

  5. 创建一个或多个线程来执行你的预处理逻辑,然后执行入队操作,输入预处理的数据。您可以找到 tf.train.Coordinatortf.train.QueueRunner对此有用的实用程序类。

  6. 正常运行您的训练图(优化器等)。

编辑:这是一个简单的 load_and_enqueue() 函数和代码片段,可帮助您入门:

# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])

# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])

# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...

sess = tf.Session()

def load_and_enqueue():
with open(...) as feature_file, open(...) as label_file:
while True:
feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
if not feature_array:
return
label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]

sess.run(enqueue_op, feed_dict={feature_input: feature_array,
label_input: label_value})

# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()

for _ in range(TRAINING_EPOCHS):
sess.run(train_op)

关于python - 如何在 tensorflow 中使用自定义 python 函数预取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34594198/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com