sygi sygi - 2 months ago 40
Python Question

Passing a numpy array to a tensorflow Queue

I have a NumPy array and would like to read it in TensorFlow's code using a Queue. I would like the queue to return the whole data shuffled, some specified number of epochs and throw an error after that. It would be best if I'd not need to hardcode the size of an example nor the number of examples.
I think shuffle batch is meant to serve that purpose. I have tried using it as follows:

data = tf.constant(train_np) # train_np is my numpy array of shape (num_examples, example_size)
batch = tf.train.shuffle_batch([data], batch_size=5, capacity=52200, min_after_dequeue=10, num_threads=1, seed=None, enqueue_many=True)

The problem with that approach is that it reads all the data continuously and I cannot specify it to finish after some number of epochs. I am aware I could use the RandomShuffleQueue and insert the data into it few times, but:
a) I don't want to waste epoch*data of memory and b) it will allow the queue to shuffle between epochs.

Is there a nice way to read the shuffled data in epochs in Tensorflow without writing your own Queue?


You could create another queue, enqueue your data onto it num_epoch times, close it, and then hook it up to your batch. To save memory, you can make this queue small, and enqueue items onto it in parallel. There will be a bit of mixing between epochs. To fully prevent mixing, you could take code below with num_epochs=1 and call it num_epochs times.

data = np.array([1, 2, 3, 4])
num_epochs = 5
queue1_input = tf.placeholder(tf.int32)
queue1 = tf.FIFOQueue(capacity=10, dtypes=[tf.int32], shapes=[()])

def create_session():
    config = tf.ConfigProto()
    return tf.InteractiveSession(config=config)

enqueue_op = queue1.enqueue_many(queue1_input)
close_op = queue1.close()
dequeue_op = queue1.dequeue()
batch = tf.train.shuffle_batch([dequeue_op], batch_size=4, capacity=5, min_after_dequeue=4)

sess = create_session()

def fill_queue():
    for i in range(num_epochs):, feed_dict={queue1_input: data})

fill_thread = threading.Thread(target=fill_queue, args=())

# read the data from queue shuffled
    while True:
        print batch.eval()
except tf.errors.OutOfRangeError:
    print "Done"

BTW, enqueue_many pattern above will hang when the queue is not large enough to load the entire numpy dataset into it. You could give yourself flexibility to have a smaller queue by loading the data in chunks as below.

data = np.array([1, 2, 3, 4])
queue1_capacity = 2
num_epochs = 2
queue1_input = tf.placeholder(tf.int32)
queue1 = tf.FIFOQueue(capacity=queue1_capacity, dtypes=[tf.int32], shapes=[()])

enqueue_op = queue1.enqueue_many(queue1_input)
close_op = queue1.close()
dequeue_op = queue1.dequeue()

def dequeue():
        while True:

def enqueue():
    for i in range(num_epochs):
        start_pos = 0
        while start_pos < len(data):
            end_pos = start_pos+queue1_capacity
            data_chunk = data[start_pos: end_pos]
  , feed_dict={queue1_input: data_chunk})
            start_pos += queue1_capacity

sess = create_session()

enqueue_thread = threading.Thread(target=enqueue, args=())

dequeue_thread = threading.Thread(target=dequeue, args=())