语义分割:Unet

语义分割:对一张图像上所有的像素点进行分类

实例分割:精确到物体的边缘,并且标注同一个物体的类别

全景分割:对图中所有物体都进行检测与分割

FCN

FCN全称为全卷积网络。在CNN广泛用于图像分类或者目标检测,但是传统基于CNN的语义分割表现地不好。于是Berkeley团队提出利用全卷积网络,即将图像级别分类扩展到图像像素级别的分类。

  1. FCN可以接受任意尺寸的图片。

  2. FCN在最后一层卷积层的输出feature map进行上采样,并且利用反卷积会出到输出图像的相同的尺寸,从而对每个像素都产生一个预测,并且保留原始图像的空间信息。

  3. 然后对上采样的特征图进行逐像素分类,计算softmax分类的损失

    详细的情况可以参考这边blog:https://www.cnblogs.com/xiaoboge/p/10502697.html

UNet

Unet是一种比较简单的分割模型,发表于2015年,论文链接为:https://arxiv.org/abs/1505.04597。该网络的最大特点是呈现出来为一个“U”型。如下图

image-20220304153941241

  1. 由上图可以看见 本次复现的网络结构Unet由左边的编码层(特征提取层)与右边的解码层(特征融合层/特征增强层)组成。基本可以算是对称结构

  2. Unet的编码层总共有4次池化层,在我自己看来,可以认为编码层是由4次降采样组成,每次降采样会经过double_conv层,但是左边的编码层总共会经过5次double_conv层(输入的图片还有一次double_conv层)。每经过一次降采样层,图像的尺寸会缩小为1/2。

  3. 解码层总共有4次上采样,右边也会经过5次double_conv层。每经过一次上采样层,图像的尺寸会放大到原来的1倍。上采样可以使产生的高分辨率图像保留有之前的低分辨率的抽象特征。之后上采样层与左边对应的下采样层产生的特征进行融合(主要是通道合并),使图像本身的特征增加。

  4. 模型的最后会经过1层卷积做分类

    本项目链接:https://aistudio.baidu.com/aistudio/projectdetail/3557116?shared=1


Unet实现 paddle_api

import paddle.nn as nn


# 定义卷积块
class DoubleConv(nn.Layer):
    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.conv1 = nn.Conv2D(in_channels, mid_channels, kernel_size = 3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2D(mid_channels)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2D(mid_channels, out_channels, kernel_size = 3, padding=1)
        self.bn2 = nn.BatchNorm2D(out_channels)
        self.relu2 = nn.ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        return x

# 定义下采样层
class Down(nn.Layer):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2D(2),
            DoubleConv(in_channels=in_channels, out_channels=out_channels)
        )
    
    def forward(self, x):
        return self.maxpool_conv(x)

# 定义上采样层
class Up(nn.Layer):
    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        if bilinear:
            self.up = nn.Upsample(scale_factor = 2, mode="bilinear", align_corners = True)
            self.conv = DoubleConv(in_channels=in_channels, out_channels=out_channels)
        else:
            self.up = nn.Conv2DTranspose(in_channels, in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)
    
    def forward(self, x1, x2):
        x1 = self.up(x1)
        # print(f"x2.size() {type(x2.shape)}")
        diffY = x2.shape[2] - x1.shape[2]
        diffX = x2.shape[3] - x1.shape[3]

        x1 = nn.functional.pad(x1, [diffX // 2, diffX - diffX // 2,
                                    diffY // 2, diffY - diffY // 2])
        # print(f"x2 {x2.shape}       x1 {x1.shape}")                           
        x = paddle.concat([x2, x1], axis=1)
        # print(f"x {x.shape}")
        return self.conv(x)

# 定义输出卷积
class OutConv(nn.Layer):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2D(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

# 定义模型
class Unet_paddle(nn.Layer):
    def __init__(self, channels, classes, bilinear=True):
        super(Unet_paddle, self).__init__()
        self.n_channels = channels
        self.n_classes = classes
        self.bilinear = bilinear
        self.inconv = DoubleConv(channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outconv = OutConv(64, classes)
        # self.outconv = nn.Sigmoid()
    
    def forward(self, x):
        x1 = self.inconv(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        x = self.outconv(x)
        
        return x
复制

Loss Function

采用的是Cross Entropy Loss,即交叉熵损失,交叉熵损失适合用来求多分类损失,当我们的训练集只有一种分类,比如说猫,需要再加上背景做第二分类,即总共有2类。在unet中可以用交叉熵进行loss计算。接下来是paddle下的代码过程,实现是CSDN@Bubbliiiing。可以参考:

def ce_loss(inputs, target, cls_weights, num_classes=2):
    n, c, h, w = inputs.shape
    nt, _, ht, wt = target.shape
    if h != ht and w != wt:
        inputs = nn.functional.interpolate(inputs, size=(ht, wt), mode="bilinear", align_corners=True)
    # 将temp_inputs从NCHW转换为NHWC
    temp_inputs = paddle.transpose(inputs, [0,2,1,3])
    temp_inputs = paddle.transpose(temp_inputs, [0,1,3,2])
    # 将temp_inputs的维度从[]
    temp_inputs = paddle.reshape(temp_inputs,[-1, c])
    temp_target = paddle.reshape(target, [-1])

    return nn.CrossEntropyLoss(weight=cls_weights, ignore_index=num_classes)(temp_inputs, temp_target)
复制

DataSet

数据集的准备,在image文件夹下准备训练图集,在mask文件夹准备标签文件(mask图片,mask图片建议的是以像素值区分,比如0是背景,1是第一个物种,2是第二个物种。。。。)

from paddle.io import Dataset, DataLoader
import glob
import os
from PIL import Image
from tqdm import tqdm
from paddle.vision.transforms import Compose, Resize, ToTensor

# 创建数据集
class UnetDataSet(Dataset):
    def __init__(self, imgs_dir, masks_dir, size = (512,512)):
        self.imgs_files = glob.glob(os.path.join(imgs_dir, "*.jpg"))
        self.masks_dir = masks_dir
        self.file_length = len(self.imgs_files)
        self.transfroms = Compose([Resize(size), ToTensor()])

    def __getitem__(self, idx):
        img_path = self.imgs_files[idx]
        img_name = os.path.basename(img_path).split(".")[0]+ "_mask."+"jpg"
        mask_img_path = os.path.join(self.masks_dir, img_name)
        assert os.path.exists(mask_img_path), f"the mask_img: {img_name}  is not exists"
        img = Image.open(img_path).convert("RGB")
        mask_img = Image.open(mask_img_path).convert("L")
        
        return {"image":self.process_img(img), "mask":self.process_img(mask_img)}
    
    def __len__(self):
        return self.file_length
    
    def process_img(self, image):
        return self.transfroms(image)
复制

训练代码

from paddle.io import random_split
import paddle
from paddle.autograd import PyLayer
import numpy as np


batch_size = 4

def eval_net(net, loader, cls_weights):
    """Evaluation without the densecrf with the dice coefficient"""
    net.eval()
    n_val = len(loader)  # the number of batch
    epoch_loss = 0
    for batch in tqdm(loader):
        imgs, mask = batch["image"], batch["mask"]
        imgs = paddle.cast(imgs, dtype=paddle.float32)
        true_masks = paddle.cast(mask, dtype=paddle.int64)
        cls_weights = paddle.to_tensor(cls_weights, dtype=paddle.float32)
        with paddle.no_grad():
            mask_pred = net(imgs)
            loss = ce_loss(mask_pred, true_masks, cls_weights, num_classes=net.n_classes)
            epoch_loss += loss.item()
    net.train()
    return epoch_loss


def train(epoches, model, optimizer, n_train, n_val, train_loader, val_loader, cls_weights):
    global_step = 0
    for epoch in range(epoches):
        model.train()
        epoch_loss = 0
        with tqdm(total=n_train, desc=f'Epoch {epoch + 1}/{epoches}') as pbar:
            for batch in train_loader:
                #print(f"batch {batch}")
                imgs = batch["image"]
                mask = batch["mask"]
                assert imgs.shape[1] == model.n_channels, \
                        f'Network has been defined with {model.n_channels} input channels, ' \
                        f'but loaded images have {imgs.shape[1]} channels. Please check that ' \
                        'the images are loaded correctly.'
                # print(imgs)
                imgs = paddle.cast(imgs, dtype=paddle.float32)
                true_masks = paddle.cast(mask, dtype=paddle.int64)
                # print(true_masks.shape)

                optimizer.clear_grad()
                mask_predict = model(imgs)
                # print(f"mask_predict {mask_predict.shape}   true_masks {true_masks}")
                # loss = criterion(mask_predict, true_masks)
                
                cls_weights = paddle.to_tensor(cls_weights, dtype=paddle.float32)
                loss = ce_loss(mask_predict, true_masks,cls_weights, num_classes=model.n_classes)

                epoch_loss += loss.item()
                pbar.set_postfix(**{'loss (batch)': loss.item()})
                loss.backward()
                optimizer.step()

                pbar.update(imgs.shape[0])
                global_step += 1
                if global_step % (n_train // (10 * batch_size)) == 0:
                    val_score = eval_net(model, val_loader, cls_weights)
                    print(f"val_score: {val_score}")
                    scheduler.step(val_score)
                    

        if True:
            save_dir = r"work/model_save_path_cat"
            if not os.path.exists(save_dir):
                os.makedirs(save_dir) 
            paddle.save(model.state_dict(),
                       os.path.join(save_dir ,f'CP_epoch{epoch + 1}.pdparams'))

model = Unet_paddle(3, 2)


dataset = UnetDataSet(imgs_dir=r"work/dataset/cat/images", masks_dir=r"work/dataset/cat/mask")
n_val = int(len(dataset) * 0.1)
n_train = len(dataset) - n_val
train_set, val_set = random_split(dataset, [n_train, n_val])
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True)

# 设置学习率策略
scheduler = paddle.optimizer.lr.ReduceOnPlateau(1e-4, 'min', patience=2)
# 设置优化器
optimizer = paddle.optimizer.RMSProp(parameters=model.parameters(), learning_rate=scheduler, momentum=0.9)

cls_weights = np.ones([model.n_classes], np.float32)
# criterion = nn.BCEWithLogitsLoss()
train(epoches=10, model=model, optimizer=optimizer, n_train=n_train, n_val=n_val, train_loader=train_loader, val_loader=val_loader, cls_weights=cls_weights)
复制

预测、查看结果

import paddle, cv2
import numpy as np
from paddle.vision.transforms import *
import shutil

def get_image(path):
    size = (512, 512)
    transforms = Compose([Resize(size), ToTensor()])
    image = Image.open(path).convert("RGB")
    image = transforms(image)
    image = paddle.unsqueeze(image, 0)
    return image

def process_tensor(output,image_path,save_dir):
    # 将tensor的shape改变为HWC
    output = paddle.transpose(output,[1,2,0])
    # print(f"output {output.shape} \n {output}")
    # 将output压缩到(0,1)之间,便于分类,本次使用的是2分类
    pred = nn.functional.sigmoid(output, -1)
    # print(f"pred {pred.shape} \n {pred}")
    # 对每个像素进行二分类
    full_mask = paddle.argmax(pred, axis=-1).numpy()
    # print(f"mask {full_mask.shape} \n {full_mask}")
    
    # 将分类结果用一张新图片表示,遍历每个像素,如果像素不属于背景,则显示,否则屏蔽
    seg_img = np.zeros((full_mask.shape[0], full_mask.shape[1], 4))
    # print(seg_img[:,:])
    seg_img[:,:,3] += ((full_mask[:,: ] > 0 )*( 255 )).astype('uint8')
    # print(f"seg_img size {seg_img.shape}")

    # 接下来将seg_img与oriimg进行合并,
    ori_img = cv2.imread(image_path)
    ori_img = cv2.cvtColor(ori_img, cv2.COLOR_BGR2RGB)
    size = ori_img.shape[:2]
    # print(f"size {size}")
    # 先将seg_img放大到与原来图片一样的大小
    seg_img = cv2.resize(seg_img, (size[1], size[0]))
    seg_img[:,:,0] = ori_img[:,:,0]
    seg_img[:,:,1] = ori_img[:,:,1]
    seg_img[:,:,2] = ori_img[:,:,2]

    image = Image.fromarray(np.uint8(seg_img))
    file_name = os.path.basename(image_path)
    image.save(os.path.join(save_dir, file_name.split(".")[0]+".png"))

def predict(model, imgs, save_dir):
    model.eval()
    # print(image.shape)
    with paddle.no_grad():
        if isinstance(imgs, list):
            for index, image in enumerate(imgs):
                image_ = get_image(image)
                output = model(image_)[0]
                process_tensor(output, image, save_dir)
        else:
            image = get_image(imgs)
            output = model(image)[0]
            process_tensor(output, imgs, save_dir)
            

model = paddle.load(r"work/model_save_path_cat/CP_epoch6.pdparams") 
predict_net = Unet_paddle(3,2)
predict_net.set_state_dict(model)
save_dir = r"work/test_save_dir"
if not os.path.exists(save_dir):
    os.makedirs(save_dir)
else:
    shutil.rmtree(save_dir)
    os.makedirs(save_dir)
test_imgs = glob.glob(os.path.join(r"work/test_40","*.jpg"))
predict(predict_net, test_imgs, save_dir)
复制

image-20220305230625551

image-20220305230859356

image-20220305230920569


结果:还过得去