LeNet训练MNIST
jupyter notebook: https://github.com/Penn000/NN/blob/master/jupyter/LeNet/LeNet.ipynb
LeNet训练MNIST
1 import warnings 2 warnings.filterwarnings('ignore') # 不打印 warning 3 4 import tensorflow as tf 5 import numpy as np 6 import os
加载MNIST数据集
分别加载MNIST训练集、测试集、验证集
1 from tensorflow.examples.tutorials.mnist import input_data 2 3 mnist = input_data.read_data_sets("MNIST_data/", one_hot=True) 4 X_train, y_train = mnist.train.images, mnist.train.labels 5 X_test, y_test = mnist.test.images, mnist.test.labels 6 X_validation, y_validation = mnist.validation.images, mnist.validation.labels
1 print("Image Shape: {}".format(X_train.shape)) 2 print("label Shape: {}".format(y_train.shape)) 3 print() 4 print("Training Set: {} samples".format(len(X_train))) 5 print("Validation Set: {} samples".format(len(X_validation))) 6 print("Test Set: {} samples".format(len(X_test)))
数据处理
由于LeNet的输入为32x32xC(C为图像通道数),而MNIST每张图像的尺寸为28×28,所以需要对图像四周进行填充,并添加一维,使得每幅图像的形状为32x32x1。
1 # 使用0对图像四周进行填充 2 X_train = np.array([np.pad(X_train[i].reshape((28, 28)), (2, 2), 'constant')[:, :, np.newaxis] for i in range(len(X_train))]) 3 X_validation = np.array([np.pad(X_validation[i].reshape((28, 28)), (2, 2), 'constant')[:, :, np.newaxis] for i in range(len(X_validation))]) 4 X_test = np.array([np.pad(X_test[i].reshape((28, 28)), (2, 2), 'constant')[:, :, np.newaxis] for i in range(len(X_test))]) 5 6 print("Updated Image Shape: {}".format(X_train.shape))
MNIST数据展示
1 import random 2 import numpy as np 3 import matplotlib.pyplot as plt 4 %matplotlib inline 5 6 index = random.randint(0, len(X_train)) 7 image = X_train[index].squeeze().reshape((32, 32)) 8 9 plt.figure(figsize=(2,2)) 10 plt.imshow(image, cmap="gray") 11 print(y_train[index])
LeNet网络结构
Input
The LeNet architecture accepts a 32x32xC image as input, where C is the number of color channels. Since MNIST images are grayscale, C is 1 in this case. LeNet的输入为32x32xC的图像,C为图像的通道数。在MNIST中,图像为灰度图,因此C等于1。
Architecture
Layer 1: Convolutional. 输出为28x28x6的张量。
Activation. 激活函数。
Pooling. 输出为14x14x6的张量。
Layer 2: Convolutional. 输出为10x10x16的张量。
Activation. 激活函数。
Pooling. 输出为5x5x16的张量。
Flatten. 将张量展平为一维向量,使用tf.contrib.layers.flatten
可以实现。
Layer 3: Fully Connected. 输出为120长度的向量。
Activation. 激活函数。
Layer 4: Fully Connected. 输出为84长度的向量。
Activation. 激活函数。
Layer 5: Fully Connected (Logits). 输出为10长度的向量。
1 # 卷积层 2 def conv_layer(x, filter_shape, stride, name): 3 with tf.variable_scope(name): 4 W = tf.get_variable('weights', shape=filter_shape, initializer=tf.truncated_normal_initializer()) 5 b = tf.get_variable('biases', shape=filter_shape[-1], initializer=tf.zeros_initializer()) 6 return tf.nn.conv2d(x, W, strides=stride, padding='VALID', name=name) + b
1 # 全连接层 2 def fc_layer(x, in_size, out_size, name): 3 with tf.variable_scope(name): 4 W = tf.get_variable('weights', shape=(in_size, out_size), initializer=tf.truncated_normal_initializer()) 5 b = tf.get_variable('biases', shape=(out_size), initializer=tf.zeros_initializer()) 6 7 return tf.nn.xw_plus_b(x, W, b, name=name)
1 def relu_layer(x, name): 2 return tf.nn.relu(x, name=name)
1 from tensorflow.contrib.layers import flatten 2 3 def LeNet(x): 4 conv1 = conv_layer(x, filter_shape=(5, 5, 1, 6), stride=[1, 1, 1, 1], name='conv1') 5 relu1 = relu_layer(conv1, 'relu1') 6 max_pool1 = max_pool_layer(relu1, kernel_size=[1, 2, 2, 1], stride=[1, 2, 2, 1], name='max_pool1') 7 8 conv2 = conv_layer(max_pool1, filter_shape=(5, 5, 6, 16), stride=[1, 1, 1, 1], name='conv2') 9 relu2 = relu_layer(conv2, 'relu2') 10 max_pool2 = max_pool_layer(relu2, kernel_size=[1, 2, 2, 1], stride=[1, 2, 2, 1], name='max_pool1') 11 12 flat = flatten(max_pool2) 13 14 fc3 = fc_layer(flat, 400, 120, name='fc3') 15 relu3 = relu_layer(fc3, 'relu3') 16 17 fc4 = fc_layer(relu3, 120, 84, name='fc4') 18 relu4 = relu_layer(fc4, 'relu4') 19 20 logits = fc_layer(relu4, 84, 10, name='fc5') 21 22 return logits
TensorFlow设置
1 EPOCHS = 10 2 BATCH_SIZE = 128 3 log_dir = './log/' 4 5 x = tf.placeholder(tf.float32, (None, 32, 32, 1)) 6 y = tf.placeholder(tf.int32, (None, 10)) 7 8 # 定义损失函数 9 logits = LeNet(x) 10 cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=y, logits=logits) 11 loss = tf.reduce_mean(cross_entropy) 12 train = tf.train.AdamOptimizer(learning_rate=0.01).minimize(loss)
训练
1 from sklearn.utils import shuffle 2 import shutil 3 log_dir = './logs/' 4 if os.path.exists(log_dir): 5 shutil.rmtree(log_dir) 6 os.makedirs(log_dir) 7 train_writer = tf.summary.FileWriter(log_dir+'train/') 8 valid_writer = tf.summary.FileWriter(log_dir+'valid/') 9 10 ckpt_path = './ckpt/' 11 saver = tf.train.Saver() 12 13 with tf.Session() as sess: 14 sess.run(tf.global_variables_initializer()) 15 n_samples = len(X_train) 16 17 step = 0 18 for i in range(EPOCHS): 19 X_train, y_train = shuffle(X_train, y_train) # 打乱数据 20 # 使用mini-batch训练 21 for offset in range(0, n_samples, BATCH_SIZE): 22 end = offset + BATCH_SIZE 23 batch_x, batch_y = X_train[offset:end], y_train[offset:end] 24 sess.run(train, feed_dict={x: batch_x, y: batch_y}) 25 26 train_loss = sess.run(loss, feed_dict={x: batch_x, y: batch_y}) 27 train_summary = tf.Summary(value=[ 28 tf.Summary.Value(tag="loss", simple_value=train_loss) 29 ]) 30 train_writer.add_summary(train_summary, step) 31 train_writer.flush() 32 step += 1 33 34 # 每个epoch使用验证集对网络进行验证 35 valid_loss = sess.run(loss, feed_dict={x: X_validation, y: y_validation}) 36 valid_summary = tf.Summary(value=[ 37 tf.Summary.Value(tag="loss", simple_value=valid_loss) 38 ]) 39 valid_writer.add_summary(valid_summary, step) 40 valid_writer.flush() 41 42 print('epoch', i, '>>> loss:', valid_loss) 43 44 # 保存模型 45 saver.save(sess, ckpt_path + 'model.ckpt') 46 print("Model saved")
训练和验证的loss曲线
测试
1 correct = tf.equal(tf.argmax(logits, 1), tf.argmax(y, 1)) 2 accuracy = tf.reduce_mean(tf.cast(correct, tf.float32)) 3 4 with tf.Session() as sess: 5 saver.restore(sess, tf.train.latest_checkpoint('./ckpt')) 6 7 test_accuracy = sess.run(accuracy, feed_dict={x: X_test, y: y_test}) 8 print("Test Accuracy = {}".format(test_accuracy))