본문 바로가기
Tensorflow

Tensorflow data pipeline 구축

by 블쭌 2021. 5. 13.
728x90
  • Tensorflow에서 feed_dict로 데이터를 계속해서 공급하는 코드를 많이 보셨을것입니다. 그러나, 논문을 리뷰하면서 github 참조를 많이 하셨던 분들은 아시겠지만 feed_dict로 데이터를 공급하는 코드는 거의 없던것 같습니다. 또한 데이터를 로드하는 bottleneck 시간이 줄어들어서 학습시간이 줄어드는 효과가 있습니다. 이에 본 블로그에서는 data를 gpu에 계속해서 공급하는 tensorflow기반 dataset api를 참고해서 설명을 드리고자합니다.

1. Tensorflow dataset 불러오기

  • numpy를 이용해서 데이터 만들기
features, labels = (np.random.sample((100,2)), np.random.sample((100,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels))
  • tensor를 이용해서 데이터 만들기
features, labels = tf.random_uniform([100, 2]), tf.random_uniform([100, 1])
dataset = tf.data.Dataset.from_tensor_slices((features,labels))
  • placeholder를 이용해서 데이터 만들기
features, labels = tf.placeholder(tf.float32, shape=[100, 2]), tf.placeholder(tf.float32, shape=[100, 1])
dataset = tf.data.Dataset.from_tensor_slices((features, labels))

2. iterator 만들기

  • one-shot iterator

one-shot iterator를 통해 iterator를 구축하고 get_next()함수를 통해 데이터를 하나씩 받는다. batch가 1이라고 생각하면 될 것 같다.

it = dataset.make_one_shot_iterator()

with tf.Session() as sess:
    feature, label = sess.run(it.get_next())
    print(feature)
    print(label)
    
# [0.21971898 0.40889433]
# [0.10460629]

3. batch

Tensorflow

Dataset API를 사용하면 주어진 크기로 데이터 세트를 자동으로 처리하는 batch(BATCH_SIZE)메서드를 사용할 수 있다. BATCH_SIZE의 디폴트 값은 1이고 drop_remainder=False로 지정되어있다.

drop_remainder는 batch_size만큼의 크기에 도달하지 못할경우 나머지 데이터는 버리는것이다.

BATCH_SIZE = 3

features, labels = (np.random.sample((10,2)), np.random.sample((10,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels)).batch(BATCH_SIZE)
it = dataset.make_one_shot_iterator()
with tf.Session() as sess:
    while True:
        try:
            print(sess.run(it.get_next()))
        except tf.errors.OutOfRangeError:
            break
    
    
(array([[0.52238885, 0.13404034],
       [0.27710991, 0.87430935],
       [0.79242495, 0.31807998]]), array([[0.37829767],
       [0.62597192],
       [0.06370111]]))
(array([[0.86296173, 0.69179809],
       [0.56882045, 0.48472389],
       [0.72679694, 0.28393554]]), array([[0.30915931],
       [0.20560619],
       [0.62670944]]))
(array([[0.56051333, 0.82230887],
       [0.99981658, 0.99507193],
       [0.19262157, 0.17897073]]), array([[0.26403552],
       [0.70085814],
       [0.16698348]]))
(array([[0.37980126, 0.46915534]]), array([[0.11434792]]))
BATCH_SIZE = 3

features, labels = (np.random.sample((10,2)), np.random.sample((10,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels)).batch(BATCH_SIZE, drop_remainder=True)
it = dataset.make_one_shot_iterator()
with tf.Session() as sess:
    while True:
        try:
            print(sess.run(it.get_next()))
        except tf.errors.OutOfRangeError:
            break
            
            
 (array([[0.52377085, 0.19861702],
       [0.21107275, 0.10277868],
       [0.99710994, 0.92402792]]), array([[0.53959942],
       [0.02703595],
       [0.34387802]]))
(array([[0.11830142, 0.51192829],
       [0.96896846, 0.82626225],
       [0.02023489, 0.34176466]]), array([[0.98699887],
       [0.00683797],
       [0.2358546 ]]))
(array([[0.25298004, 0.85590802],
       [0.56537934, 0.81674494],
       [0.09169761, 0.84117038]]), array([[0.73028954],
       [0.49231779],
       [0.82787181]]))

drop_remainder=True의 경우 총 10개의 데이터에서 batch_size가 3이라서 1이남는 데이터의 경우는 무시된것을 확인할 수 있다.


4. repeat

repeat라는 함수는 데이터셋을 읽다가 마지막에 도달했을 경우, 다시 처음부터 조회하는 함수입니다. repeat을 사용하면 dataset이 몇 번 반복해서 사용될 지 정할 수 있다. 파라미터가 없다면 계속 반복하고 보통 계속 반복시키고 epoch 값을 직접 제어하는 것이 좋다. 

위에서 drop_remainder=True가 아니라 마지막데이터에 이어서 처음 데이터로 다시 순서를 돌아가게 만드는방법에 유용하다. 단 repeat()이 먼저오고 batch가 와야한다. batch가 오고 repeat이오면 8개만큼 무한반복하는것 뿐이기 때문이다.

BATCH_SIZE = 3

features, labels = (np.random.sample((10,2)), np.random.sample((10,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE, drop_remainder=False)
it = dataset.make_one_shot_iterator()
with tf.Session() as sess:
    for i in range(5):
        print(sess.run(it.get_next()))
        
(array([[0.31132242, 0.5288729 ],
       [0.20179185, 0.01067924],
       [0.01338268, 0.54626167]]), array([[0.06933487],
       [0.85047053],
       [0.43254418]]))
(array([[0.33660174, 0.49698311],
       [0.66623285, 0.81486668],
       [0.17075565, 0.11668339]]), array([[0.35514334],
       [0.3604087 ],
       [0.27985318]]))
(array([[0.00524041, 0.39124847],
       [0.34725305, 0.50394901],
       [0.88295544, 0.14286786]]), array([[0.97596611],
       [0.20841496],
       [0.40209223]]))
(array([[0.81630593, 0.3377158 ],
       [0.31132242, 0.5288729 ],
       [0.20179185, 0.01067924]]), array([[0.54171006],
       [0.06933487],
       [0.85047053]]))
(array([[0.01338268, 0.54626167],
       [0.33660174, 0.49698311],
       [0.66623285, 0.81486668]]), array([[0.43254418],
       [0.35514334],
       [0.3604087 ]]))

5. prefetch

prefectch를 사용하면 학습 데이터를 나눠서 읽어오기 때문에 첫 번째 데이터를 GPU에서 학습하는 동안 두 번째 데이터를 CPU에서 준비할 수 있어 리소스의 유휴 상태를 줄일 수 있다.

BATCH_SIZE = 2
epoch_num = 3

features, labels = (np.random.sample((5,2)), np.random.sample((5,1)))
dataset = tf.data.Dataset.from_tensor_slices((features,labels))

dataset = dataset.repeat(epoch_num)
dataset = dataset.prefetch(buffer_size=BATCH_SIZE)
dataset = dataset.batch(BATCH_SIZE)
it = dataset.make_one_shot_iterator()
with tf.Session() as sess:
    while True:
        try:
            print(sess.run(it.get_next()))
        except tf.errors.OutOfRangeError:
            break

5개의 데이터를 총 3번 = 15번 돌리는데 2개씩 batch를 뽑아내서 진행한다.

prefetch를 통해 미리 데이터를 올려놓기때문에 bottleneck현상을 줄일 수 있다.


6. 전체적인 data pipeline

EPOCHS = 2
BATCH_SIZE = 3
features, labels = (np.random.sample((5,2)), np.random.sample((5,1)))

dataset = tf.data.Dataset.from_tensor_slices((features,labels)).repeat().batch(BATCH_SIZE)

it = dataset.make_one_shot_iterator()
x, y = it.get_next()

net = tf.layers.dense(x, 8, activation=tf.nn.relu)
net = tf.layers.dense(net, 8, activation=tf.nn.relu)
prediction = tf.layers.dense(net, 1, activation=tf.tanh)

loss = tf.losses.mean_squared_error(prediction, y) 
train_op = tf.train.AdamOptimizer().minimize(loss)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    for i in range(EPOCHS):
        _, loss_value = sess.run([train_op, loss])
        print("Iter: {}, Loss: {:.4f}".format(i, loss_value))

  • 참고

https://www.tensorflow.org/versions/r1.15/api_docs/python/tf/data/Dataset

 

tf.data.Dataset  |  TensorFlow Core v1.15.0

Represents a potentially large set of elements. Inherits From: Dataset View aliases Compat aliases for migration See Migration guide for more details. tf.compat.v1.data.Dataset tf.data.Dataset() A Dataset can be used to represent an input pipeline as a col

www.tensorflow.org

https://hiseon.me/data-analytics/tensorflow/tensorflow-dataset/

https://towardsdatascience.com/how-to-use-dataset-in-tensorflow-c758ef9e4428

 

728x90

댓글