1. 前言

1.1 案例介绍

本案例使用Pytorch搭建一个GoogLeNet网络结构,用于Fashion-MNIST数据集的图像分类。针对该问题的分析可以分为数据准备、模型建立以及使用训练集进行训练和使用测试集测试模型的效果。

1.2 环境配置

⑴ 操作系统: Windows10
⑵ 编译器环境: PyCharm Community Edition 2021.2
⑶ 配置环境: Pytorch1.7.1 + torchvision8.2 + CUDA11.3

1.3 模块导入

本案例需要导入如下的库文件和相关模块:

import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns
import copy
import time
import torch
import torch.nn as nn
from torch.optim import Adam
import torch.utils.data as Data
from torchvision import transforms
from torchvision.datasets import FashionMNIST

2. 图像数据准备

在模型建立与训练之前,首先准备FashionMNIST数据集,该数据集可以直接使用torchvision库中datasets模块的FashionMNIST()函数读取,如果指定的工作文件夹中没有当前数据,可以从网络上自动下载该数据。

2.1 训练验证集的准备

训练验证集的加载处理程序被包装成如下的train_data_process()函数 ,它的作用是导入训练数据集,然后使用Data.DataLoader()函数将其定义为数据加载器,每个batch中会包含64个样本,通过len()函数可以计算数据加载器中包含的batch数量,输出显示train_loader中包含938个batch。需要注意的是参数shuffle = False,表示加载器中每个batch使用的样本都是固定的,这样有利于在训练模型时根据迭代的次数将其分为训练集和验证集。同时为了观察数据集中每个图像的内容,可以获取一个batch的图像,然后将其可视化,以观察数据。

# 处理训练集数据
def train_data_process():
    # 加载FashionMNIST数据集
    train_data = FashionMNIST(root="./data/FashionMNIST",  # 数据路径
                              train=True,  # 只使用训练数据集
                              transform=transforms.Compose([transforms.Resize(size=96), transforms.ToTensor()]),  # 把PIL.Image或者numpy.array数据类型转变为torch.FloatTensor类型
                                                                                                                   # 尺寸为Channel * Height * Width,数值范围缩小为[0.0, 1.0]
                              download=False,  # 若本身没有下载相应的数据集,则选择True
                              )
    train_loader = Data.DataLoader(dataset=train_data,  # 传入的数据集
                                   batch_size=64,  # 每个Batch中含有的样本数量
                                   shuffle=False,  # 不对数据集重新排序
                                   num_workers=0,  # 加载数据所开启的进程数量
                                   )
    print("The number of batch in train_loader:", len(train_loader))  # 一共有938个batch,每个batch含有64个训练样本

    # 获得一个Batch的数据
    for step, (b_x, b_y) in enumerate(train_loader):
        if step > 0:
            break
    batch_x = b_x.squeeze().numpy()  # 将四维张量移除第1维,并转换成Numpy数组
    batch_y = b_y.numpy()  # 将张量转换成Numpy数组
    class_label = train_data.classes  # 训练集的标签
    class_label[0] = "T-shirt"
    print("the size of batch in train data:", batch_x.shape)
    
    # 可视化一个Batch的图像
    plt.figure(figsize=(12, 5))
    for ii in np.arange(len(batch_y)):
        plt.subplot(4, 16, ii+1)
        plt.imshow(batch_x[ii, :, :], cmap=plt.cm.gray)
        plt.title(class_label[batch_y[ii]], size=9)
        plt.axis("off")
        plt.subplots_adjust(wspace=0.05)
    plt.show()
    
    return train_loader, class_label

得到的可视化图像如下:

在这里插入图片描述

注:GoogLeNet模型的计算复杂,而且不如VGG那样便于修改通道数。这里我们将Fashion-MNIST数据集的尺寸从224降到96来简化计算,同时每个batch size为64,因此每个mini-batch的尺寸为64×96×96。

2.2 测试集的准备

测试集的加载处理程序被包装成如下的test_data_process()函数 ,它的作用是导入测试数据集,将其尺寸扩展到96,并且将所有的样本处理为一个整体,看作一个batch用于测试。。

# 处理测试集数据
def test_data_process():
    test_data = FashionMNIST(root="./data/FashionMNIST",  # 数据路径
                             train=False,  # 不使用训练数据集
                             transform=transforms.Compose([transforms.Resize(size=96), transforms.ToTensor()]),  # 把PIL.Image或者numpy.array数据类型转变为torch.FloatTensor类型
                                                                                                                  # 尺寸为Channel * Height * Width,数值范围缩小为[0.0, 1.0]
                             download=False,  # 如果前面数据已经下载,这里不再需要重复下载
                             )
    test_loader = Data.DataLoader(dataset=test_data,  # 传入的数据集
                                  batch_size=1,  # 每个Batch中含有的样本数量
                                  shuffle=True,  # 不对数据集重新排序
                                  num_workers=0,  # 加载数据所开启的进程数量
                                   )

    # 获得一个Batch的数据
    for step, (b_x, b_y) in enumerate(test_loader):
        if step > 0:
            break
    batch_x = b_x.squeeze().numpy()  # 将四维张量移除第1维,并转换成Numpy数组
    batch_y = b_y.numpy()  # 将张量转换成Numpy数组
    print("The size of batch in test data:", batch_x.shape)

    return test_loader

3. 卷积神经网络的搭建

3.1 Inception模块的创建

Inception模块是GoogLeNet网络中的基础卷积块,得名于同名电影《盗梦空间》(Inception)。Inception模块里有4条并行的线路。前3条线路使用窗口大小分别是1×1、3×3和5×5的卷积层来抽取不同空间尺寸下的信息,其中中间2个线路会对输入先做1×1卷积来减少输入通道数,以降低模型复杂度。第四条线路则使用3×3最大池化层,后接1×1卷积来改变通道数。4条线路都使用了合适的padding来使输入与输出的高和宽一致。最后将每条线路的输出在通道维上连接合并,并输入到后面的层中去。
在这里插入图片描述

# 定义一个Inception模块
class Inception(nn.Module):
    # c1 - c4为每条线路里的层的输出通道数
    def __init__(self, in_c, c1, c2, c3, c4):
        super(Inception, self).__init__()
        # 线路1,单1x1卷积层
        self.p1_1 = nn.Conv2d(in_c, c1, kernel_size=1)
        # 线路2,1x1卷积层后接3x3卷积层
        self.p2_1 = nn.Conv2d(in_c, c2[0], kernel_size=1)
        self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
        # 线路3,1x1卷积层后接5x5卷积层
        self.p3_1 = nn.Conv2d(in_c, c3[0], kernel_size=1)
        self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
        # 线路4,3x3最大池化层后接1x1卷积层
        self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
        self.p4_2 = nn.Conv2d(in_c, c4, kernel_size=1)

    def forward(self, x):
        p1 = nn.functional.relu(self.p1_1(x))
        p2 = nn.functional.relu(self.p2_2(nn.functional.relu(self.p2_1(x))))
        p3 = nn.functional.relu(self.p3_2(nn.functional.relu(self.p3_1(x))))
        p4 = nn.functional.relu(self.p4_2(self.p4_1(x)))
        
        return torch.cat((p1, p2, p3, p4), dim=1)  # 在通道维度上连接输出

3.2 GoogLeNet网络的搭建

GoogLeNet与VGG一样,在主体卷积部分中使用5个模块,每个模块之间使用步幅为2,窗口大小为3×3的最大池化层来减小输出的高度和宽度。

第一部分使用了一个通道数为64的7×7卷积层。
第二部分使用了2个卷积层,分别是一个通道数为64的1×1卷积层,和一个将通道增大3倍的3×3卷积层。它对应Inception模块中的第二条线路。
第三部分串联了2个完整的Inception模块。第一个Inception模块的输出通道数为64+128+32+32=256,4条线路的输出通道数比例为64:128:32:32=2:4:1:1。其中第二、第三条线路先分别将输入通道数减小至96/192=1/2和16/192=1/12后,再接上第二层卷积层。第二个Inception模块输出通道数增至128+192+96+64=480,每条线路的输出通道数之比为128:192:96:64=4:6:3:2。其中第二、第三条线路先分别将输入通道数减小至128/256=1/2和32/256=1/8。
第四部分串联了5个Inception模块,其输出通道数分别是192+208+48+64=512、160+224+64+64=512、128+256+64+64=512、112+288+64+64=528和256+320+128+128=832。这些线路的通道数分配和第三模块中的类似,首先含3×3卷积层的第二条线路输出最多通道,其次是仅含1×1卷积层的第一条线路,之后是含5×5卷积层的第三条线路和含3×3最大池化层的第四条线路。其中第二、第三条线路都会先按比例减小通道数。这些比例在各个Inception模块中都略有不同。
第五部分包含了输出通道数为256+320+128+128=832和384+384+128+128=1024的两个Inception模块。其中每条线路的通道数的分配思路和第三、第四部分中的一致,只是在具体数值上有所不同。需要注意的是,第五部分的后面紧跟着输出层,该模块同NiN一样使用全局平均池化层来将每个通道的高和宽变成1,最后将输出转变成二维数组后,接上一个输出个数为标签类别数的全连接层。
在这里插入图片描述

# 定义一个全局平均池化层
class GlobalAvgPool2d(nn.Module):
    def __init__(self):
        super(GlobalAvgPool2d, self).__init__()

    def forward(self, x):
        return nn.functional.avg_pool2d(x, kernel_size=x.size()[2:])  # 池化窗口形状等于输入图像的形状
        
# 定义GoogLeNet模型网络结构
def GoogLeNet():
    b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
                       nn.ReLU(),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
                       )

    b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
                       nn.Conv2d(64, 192, kernel_size=3, padding=1),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
                       )

    b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
                       Inception(256, 128, (128, 192), (32, 96), 64),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
                       )

    b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
                       Inception(512, 160, (112, 224), (24, 64), 64),
                       Inception(512, 128, (128, 256), (24, 64), 64),
                       Inception(512, 112, (144, 288), (32, 64), 64),
                       Inception(528, 256, (160, 320), (32, 128), 128),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
                       )

    b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
                       Inception(832, 384, (192, 384), (48, 128), 128),
                       GlobalAvgPool2d()
                       )

    net = nn.Sequential(b1,
                        b2,
                        b3,
                        b4,
                        b5,
                        nn.Flatten(),
                        nn.Linear(1024, 10)
                        )

    return net

4. 卷积神经网络训练与预测

为了训练网络结构NiN_Net,定义了一个train_model()函数,该函数的作用是使用训练数据集来训练NiN_Net网络。训练数据集包含了60000张图像,划分成938个batch,其中80%的batch用于模型的训练,20%的batch用于模型的验证,因此在train_model()函数中,包含了模型的训练和验证两个过程。

# 定义网络的训练过程
def train_model(model, traindataloader, train_rate, criterion, device, optimizer, num_epochs=25):
    '''
    :param model: 网络模型
    :param traindataloader: 训练数据集,会切分为训练集和验证集
    :param train_rate: 训练集batch_size的百分比
    :param criterion: 损失函数
    :param device: 运行设备
    :param optimizer: 优化方法
    :param num_epochs: 训练的轮数
    '''

    batch_num = len(traindataloader)  # batch数量
    train_batch_num = round(batch_num * train_rate)  # 将80%的batch用于训练,round()函数四舍五入
    best_model_wts = copy.deepcopy(model.state_dict())  # 复制当前模型的参数
    # 初始化参数
    best_acc = 0.0  # 最高准确度
    train_loss_all = []  # 训练集损失函数列表
    train_acc_all = []  # 训练集准确度列表
    val_loss_all = []  # 验证集损失函数列表
    val_acc_all = []  # 验证集准确度列表
    since = time.time()  # 当前时间
    # 进行迭代训练模型
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # 初始化参数
        train_loss = 0.0  # 训练集损失函数
        train_corrects = 0  # 训练集准确度
        train_num = 0  # 训练集样本数量
        val_loss = 0.0  # 验证集损失函数
        val_corrects = 0  # 验证集准确度
        val_num = 0  # 验证集样本数量
        # 对每一个mini-batch训练和计算
        for step, (b_x, b_y) in enumerate(traindataloader):
            b_x = b_x.to(device)
            b_y = b_y.to(device)
            if step < train_batch_num:  # 使用数据集的80%用于训练
                model.train()  # 设置模型为训练模式,启用Batch Normalization和Dropout
                output = model(b_x)  # 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
                pre_lab = torch.argmax(output, 1)  # 查找每一行中最大值对应的行标
                loss = criterion(output, b_y)  # 计算每一个batch的损失函数
                optimizer.zero_grad()  # 将梯度初始化为0
                loss.backward()  # 反向传播计算
                optimizer.step()  # 根据网络反向传播的梯度信息来更新网络的参数,以起到降低loss函数计算值的作用
                train_loss += loss.item() * b_x.size(0)  # 对损失函数进行累加
                train_corrects += torch.sum(pre_lab == b_y.data)  # 如果预测正确,则准确度train_corrects加1
                train_num += b_x.size(0)  # 当前用于训练的样本数量
            else:  # 使用数据集的20%用于验证
                model.eval()  # 设置模型为评估模式,不启用Batch Normalization和Dropout
                output = model(b_x)  # 前向传播过程,输入为一个batch,输出为一个batch中对应的预测
                pre_lab = torch.argmax(output, 1)  # 查找每一行中最大值对应的行标
                loss = criterion(output, b_y)  # 计算每一个batch中64个样本的平均损失函数
                val_loss += loss.item() * b_x.size(0)  # 将验证集中每一个batch的损失函数进行累加
                val_corrects += torch.sum(pre_lab == b_y.data)  # 如果预测正确,则准确度val_corrects加1
                val_num += b_x.size(0)  # 当前用于验证的样本数量

        # 计算并保存每一次迭代的成本函数和准确率
        train_loss_all.append(train_loss / train_num)  # 计算并保存训练集的成本函数
        train_acc_all.append(train_corrects.double().item() / train_num)  # 计算并保存训练集的准确率
        val_loss_all.append(val_loss / val_num)  # 计算并保存验证集的成本函数
        val_acc_all.append(val_corrects.double().item() / val_num)  # 计算并保存验证集的准确率
        print('{} Train Loss: {:.4f} Train Acc: {:.4f}'.format(epoch, train_loss_all[-1], train_acc_all[-1]))
        print('{} Val Loss: {:.4f} Val Acc: {:.4f}'.format(epoch, val_loss_all[-1], val_acc_all[-1]))

        # 寻找最高准确度
        if val_acc_all[-1] > best_acc:
            best_acc = val_acc_all[-1]  # 保存当前的最高准确度
            best_model_wts = copy.deepcopy(model.state_dict())  # 保存当前最高准确度下的模型参数
        time_use = time.time() - since  # 计算耗费时间
        print("Train and val complete in {:.0f}m {:.0f}s".format(time_use // 60, time_use % 60))

    # 选择最优参数
    model.load_state_dict(best_model_wts)  # 加载最高准确度下的模型参数
    train_process = pd.DataFrame(data={"epoch": range(num_epochs),
                                       "train_loss_all": train_loss_all,
                                       "val_loss_all": val_loss_all,
                                       "train_acc_all": train_acc_all,
                                       "val_acc_all": val_acc_all}
                                 )  # 将每一代的损失函数和准确度保存为DataFrame格式

    # 显示每一次迭代后的训练集和验证集的损失函数和准确率
    plt.figure(figsize=(12, 4))
    plt.subplot(1, 2, 1)
    plt.plot(train_process['epoch'], train_process.train_loss_all, "ro-", label="Train loss")
    plt.plot(train_process['epoch'], train_process.val_loss_all, "bs-", label="Val loss")
    plt.legend()
    plt.xlabel("epoch")
    plt.ylabel("Loss")
    plt.subplot(1, 2, 2)
    plt.plot(train_process['epoch'], train_process.train_acc_all, "ro-", label="Train acc")
    plt.plot(train_process['epoch'], train_process.val_acc_all, "bs-", label="Val acc")
    plt.xlabel("epoch")
    plt.ylabel("acc")
    plt.legend()
    plt.show()

    return model, train_process

接下来定义一个test_model()函数,它的作用是使用测试集在最优模型上进行测试,从而验证该模型的性能。

# 测试模型
def test_model(model, testdataloader, device):
    '''
    :param model: 网络模型
    :param testdataloader: 测试数据集
    :param device: 运行设备
    '''

    # 初始化参数
    test_corrects = 0.0
    test_num = 0
    test_acc = 0.0
    # 只进行前向传播计算,不计算梯度,从而节省内存,加快运行速度
    with torch.no_grad():
        for test_data_x, test_data_y in testdataloader:
            test_data_x = test_data_x.to(device)
            test_data_y = test_data_y.to(device)
            model.eval()  # 设置模型为评估模式,不启用Batch Normalization和Dropout
            output = model(test_data_x)  # 前向传播过程,输入为测试数据集,输出为对每个样本的预测
            pre_lab = torch.argmax(output, 1)  # 查找每一行中最大值对应的行标
            test_corrects += torch.sum(pre_lab == test_data_y.data)  # 如果预测正确,则准确度val_corrects加1
            test_num += test_data_x.size(0)  # 当前用于训练的样本数量

    test_acc = test_corrects.double().item() / test_num  # 计算在测试集上的分类准确率
    print("test accuracy:", test_acc)

最后开始对模型进行训练和测试,其中优化算法使用了Adam优化器学习率设置为0.002,损失函数为交叉熵函数。然后调用train_model()函数将训练集train_loader的80%用于训练,20%用于验证,一共训练25轮。

# 模型的训练和测试
def train_model_process(myconvnet):
    optimizer = torch.optim.Adam(myconvnet.parameters(), lr=0.001)  # 使用Adam优化器,学习率为0.001
    criterion = nn.CrossEntropyLoss()  # 损失函数为交叉熵函数
    device = 'cuda' if torch.cuda.is_available() else 'cpu'  # GPU加速
    train_loader, class_label = train_data_process()  # 加载训练集
    test_loader = test_data_process()  # 加载测试集

    myconvnet = myconvnet.to(device)
    myconvnet, train_process = train_model(myconvnet, train_loader, 0.8, criterion, device, optimizer, num_epochs=25)  # 开始训练模型
    test_model(myconvnet, test_loader, device)  # 使用测试集进行评估

在模型训练过程中,损失函数和分类准确率的变化曲线如下。可以看到,损失函数在训练集上迅速减小,而在验证集上上下波动。分类准确率在训练集上一直在增大,而在验证集上逐渐增大并小幅波动。

在这里插入图片描述

为了得到计算模型的泛化能力,将测试集给到训练好的模型进行预测,从而得到在测试集上的预测准确率(如下图)。

在这里插入图片描述

注:对于复杂的神经网络和大规模的数据来说,使用CPU来计算可能不够高效,因此需要将模型移动到GPU,使用GPU加速计算。

5. 运行程序

如下是主函数中的内容,返回一个GoogLeNet网络结构,并对该卷积神经网络进行训练和测试。

if __name__ == '__main__':
    model = GoogLeNet()
    train_model_process(model)

注:在之前的程序中配置了使用多个进程同时加载训练集数据,多进程的使用必须在main()函数中进行,否则会在执行过程中报错。