파이썬 딥러닝 ai 스쿨 기초/lecture05

lecture05 2교시 개념정리 CNN 활용 객체분류

junny1997 2021. 3. 26. 08:45

keras cifar10 데이터셋 확인

import numpy as np
import matplotlib.pyplot as plt

from tensorflow.keras.datasets.cifar10 import load_data

(x_train, y_train), (x_test, y_test) = load_data()

print(np.shape(x_train))
print(np.shape(y_train))
print(np.shape(x_test))
print(np.shape(y_test))

# airplane, automobile, bird, cat, deer, dog, frog, horse, ship, truck

print(y_train[111])
plt.imshow(x_train[111])
plt.show()

(50000, 32, 32, 3)
(50000, 1)
(10000, 32, 32, 3)
(10000, 1)
[8]

lenet 구현

# lenet 구현
import  tensorflow as tf
import numpy as np
import time
import os
from tensorflow.keras.datasets.cifar10 import load_data

X = tf.placeholder(tf.float32, [None, 32, 32, 3], name="X")
# keras 데이터셋은 2차행렬 img reshape 필요없음
Y = tf.placeholder(tf.float32, [None, 10], name="Y")
keep_prob = tf.placeholder(tf.float32, name="keep_prob")

# convolution and pooling 3회 진행
# 32*32*3 입력
W1 = tf.Variable(tf.random_normal([3, 3, 3, 32], stddev=0.01))
# 3*3 필터 32채널로 3채널 데이터 convolution
L1 = tf.nn.conv2d(X, W1, strides=[1, 1, 1, 1], padding='SAME')
L1 = tf.nn.relu(L1)
L1 = tf.nn.max_pool(L1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
L1 = tf.nn.dropout(L1, keep_prob=keep_prob)

# 16*16*32 입력
W2 = tf.Variable(tf.random_normal([3, 3, 32, 64], stddev=0.01))
L2 = tf.nn.conv2d(L1, W2, strides=[1, 1, 1, 1], padding='SAME')
L2 = tf.nn.relu(L2)
L2 = tf.nn.max_pool(L2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
L2 = tf.nn.dropout(L2, keep_prob=keep_prob)

# 8*8*64 입력
W3 = tf.Variable(tf.random_normal([3, 3, 64, 128], stddev=0.01))
L3 = tf.nn.conv2d(L2, W3, strides=[1, 1, 1, 1], padding='SAME')
L3 = tf.nn.relu(L3)
L3 = tf.nn.max_pool(L3, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
L3 = tf.nn.dropout(L3, keep_prob=keep_prob)
# 4*4*128
L3_flat = tf.reshape(L3, [-1, 4 * 4 * 128])

# 3 layer mlp 초기화 He사용
W4 = tf.get_variable("W4", shape=[4 * 4 * 128, 128], initializer=tf.initializers.he_normal())
b4 = tf.Variable(tf.random_normal([128]))
FC1 = tf.nn.relu(tf.nn.xw_plus_b(L3_flat, W4, b4))
FC1 = tf.nn.dropout(FC1, keep_prob=keep_prob)

W5 = tf.get_variable("W5", shape=[128, 64], initializer=tf.initializers.he_normal())
b5 = tf.Variable(tf.random_normal([64]))
FC2 = tf.nn.relu(tf.nn.xw_plus_b(FC1, W5, b5))
FC2 = tf.nn.dropout(FC2, keep_prob=keep_prob)

W6 = tf.get_variable("W6", shape=[64, 10], initializer=tf.initializers.he_normal())
b6 = tf.Variable(tf.random_normal([10]))
hypothesis = tf.nn.xw_plus_b(FC2, W6, b6, name="hypothesis")

correct_prediction = tf.equal(tf.argmax(hypothesis, 1), tf.argmax(Y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=hypothesis, labels=Y))
summary_op = tf.summary.scalar("loss", cost)

learning_rate = 0.001
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

# initialize
sess = tf.Session()
sess.run(tf.global_variables_initializer())

training_epochs = 30
batch_size = 128
dropout_keep = 0.8

timestamp = str(int(time.time()))
out_dir = os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp))
train_summary_dir = os.path.join(out_dir, "summaries", "train")
train_summary_writer = tf.summary.FileWriter(train_summary_dir, sess.graph)
val_summary_dir = os.path.join(out_dir, "summaries", "dev")
val_summary_writer = tf.summary.FileWriter(val_summary_dir, sess.graph)
checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints"))
checkpoint_prefix = os.path.join(checkpoint_dir, "model")
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
saver = tf.train.Saver(tf.global_variables(), max_to_keep=3)

max = 0
early_stopped = 0
(x_train_val, y_train_val), (x_test, y_test) = load_data()

shuffle_indices = np.random.permutation((np.arange(len(y_train_val))))
shuffled_x = np.asarray(x_train_val[shuffle_indices])
shuffled_y = y_train_val[shuffle_indices]

val_sample_index = -1 * int(0.1 * float(len(y_train_val)))
x_train, x_val = shuffled_x[:val_sample_index], shuffled_x[val_sample_index:]
y_train, y_val = shuffled_y[:val_sample_index], shuffled_y[val_sample_index:]
# 정답이 [n]으로 저장되어있음으로 one hot행렬로 변환
x_test = np.asarray(x_test)
# asarray 복사본 반환
y_train_one_hot = np.eye(10)[y_train]
# np.eye 단위행렬생성, 10 * 10 단위행렬의 y_train 번째
print(np.shape(y_train_one_hot))  # (45000, 1, 10)
y_train_one_hot = np.squeeze(y_train_one_hot, axis=1)  # (45000, 10)
print(np.shape(y_train_one_hot))
y_test_one_hot = np.eye(10)[y_test]
y_test_one_hot = np.squeeze(y_test_one_hot, axis=1)
y_val_one_hot = np.eye(10)[y_val]
y_val_one_hot = np.squeeze(y_val_one_hot, axis=1)

def next_batch(batch_size, data):
    data = np.array(data)
    np.random.seed(10)
    shuffle_indices = np.random.permutation(np.arange(len(data)))
    shuffled_data = data[shuffle_indices]
    # num_batches_per_epoch = int((len(data)-1)/batch_size) + 1
    num_batches_per_epoch = int(len(data)/batch_size)

    for batch_num in range(num_batches_per_epoch):  # 450 - 0...449
        start_index = batch_num * batch_size  # 0, 100, 200, ..., 44900
        end_index = min((batch_num + 1) * batch_size, len(data))  # 100, 200, 300, ..., 44954
        yield shuffled_data[start_index:end_index]

for epoch in range(training_epochs):
    avg_cost = 0
    total_batch = int(len(y_train) / batch_size)  # 45000 / 100 = 450
    batches = next_batch(batch_size, list(zip(x_train, y_train_one_hot)))  # [(image, label), (image, label), ...]
    for batch in batches:
        batch_xs, batch_ys = zip(*batch)  # unzip  (image, label) -> image, label
        feed_dict = {X: batch_xs, Y: batch_ys, keep_prob: dropout_keep}
        c, _, s = sess.run([cost, optimizer, summary_op], feed_dict=feed_dict)
        avg_cost += c / total_batch
    train_summary_writer.add_summary(s, epoch)

    print('Epoch:', '%04d' % (epoch + 1), 'training cost =', '{:.9f}'.format(avg_cost))
    val_accuracy, summaries = sess.run([accuracy, summary_op],
                                       feed_dict={X: x_val, Y: y_val_one_hot,
                                                  keep_prob: 1.0})
    val_summary_writer.add_summary(summaries, epoch)
    print('Validation Accuracy:', val_accuracy)
    if val_accuracy > max:
        max = val_accuracy
        early_stopped = epoch + 1
        saver.save(sess, checkpoint_prefix, global_step=early_stopped)

print('Learning Finished!')
print('Validation Max Accuracy:', max)
print('Early stopped time:', early_stopped)

test_accuracy = sess.run(accuracy, feed_dict={X: x_test, Y: y_test_one_hot, keep_prob: 1.0})
print('Test Latest Accuracy:', test_accuracy)