python - How to use TensorFlow reader and queue to read two file at same time? -
my training set contains 2 kinds of file: training image file name "1.png" , label file name "1.label.txt".
i found usage of queue , reader in tutorials this:
filename_queue = tf.train.string_input_producer(filenames) result.key, value = reader.read(filename_queue)
however, because training set contains 2 kinds of file, 1 correspond one. how can make use of queue , reader code above?
edit
i thinking using 1 queue containing base names feed 2 queue, image , label respectively. code this:
with tf.session() sess: base_name_queue = tf.train.string_input_producer(['image_names'], num_epochs=20) base_name = base_name_queue.dequeue() image_name = base_name + ".png" image_name_queue = data_flow_ops.fifoqueue(32, image_name.dtype.base_dtype) image_name_queue.enqueue([image_name]) x = image_name_queue.dequeue() print_op = tf.print(image_name, [image_name]) qr = tf.train.queuerunner(base_name_queue, [base_name_queue] * 4) coord = tf.train.coordinator() enqueue_threads = qr.create_threads(sess, coord=coord, start=true) step in range(1000000): if coord.should_stop(): break print(sess.run(print_op)) coord.request_stop() coord.join(enqueue_threads)
but running code result in error:
typeerror: fetch argument of has invalid type , must string or tensor. (can not convert fifoqueue tensor or operation.)
and error point line:
coord.join(enqueue_threads)
i think must misunderstand how tensorflow queue works.
i have figured out solution problem. post answer here instead of delete question, hoping people new tensorflow.
the answer contains 2 parts:
part 1: how read files pair pair using tensorflow's queue
the solution simple:
- use 2 queue store 2 set of files. note 2 set should ordered in same way.
- do preprocessing respectively using
dequeue
. - combine 2 preprocessed tensor 1 list , pass list
shuffle_batch
code here:
base_names = ['file1', 'file2'] base_tensor = tf.convert_to_tensor(base_names) image_name_queue = tf.train.string_input_producer( tensor + '.png', shuffle=false # note: must set shuffle false ) label_queue = tf.train.string_input_producer( tensor + '.lable.txt', shuffle=false # note: must set shuffle false ) # use reader read file image_reader = tf.wholefilereader() image_key, image_raw = image_reader.read(image_name_queue) image = tf.image.decode_png(image_raw) label_reader = tf.wholefilereader() label_key, label_raw = label_reader.read(label_queue) label = tf.image.decode_raw(label_raw) # preprocess image processed_image = tf.image.per_image_whitening(image) batch = tf.train.shuffle_batch([processed_image, label], 10, 100, 100) # print batch queue_threads = queue_runner.start_queue_runners() print(sess.run(batch))
part 2: queue, queuerunner, coordinator , helper functions
queue queue (seems meaningless). queue has 2 method: enqueue
, dequeue
. input of enqueue
tensor
(well, can enqueue normal data, converted tensor
internally). return value of dequeue
tensor
. can make pipeline of queues this:
q1 = data_flow_ops.fifoqueue(32, tf.int) q2 = data_flow_ops.fifoqueue(32, tf.int) enq1 = q1.enqueue([1,2,3,4,5]) v1 = q1.dequeue() enq2 = q2.enqueue(v1)
the benefit of using queue in tensorflow asynchronously load data, improve performance , save memory. code above not runnable, because there no thread running operations. queuerunner designed describe how enqueue
data in parallel. parameter of initializing queuerunner enqueue
operation (the output of enqueue
).
after setting queuerunner
s, have start threads. 1 way start them when creating them:
enqueue_threads = qr.create_threads(sess, coord=coord, start=true)
or, can start threads after setting works done:
# add queue runner queue_runner.add_queue_runner(queue_runner.queuerunner(q, [enq])) # start queue runners queue_threads = queue_runner.start_queue_runners()
when threads started, have decide when exit. coordinator here this. coordinator
shared flag between running threads. if 1 of them finished or run error, call coord.request_stop()
, thread true
when calling coord.should_stop()
. pattern of using coordinator
is:
coord = tf.train.coordinator() step in range(1000000): if coord.should_stop(): break print(sess.run(print_op)) coord.request_stop() coord.join(enqueue_threads)
Comments
Post a Comment