暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

深度残差网络ResNet与Cifar100实战

快来自习室 2020-08-03
2288
​1.什么是ResNet

何凯明博士,广东省高考状元,进入清华。2007年清华大学毕业之后开始在微软亚洲研究院(MSRA)实习,2011年香港中文大学博士毕业后正式加入MSRA,目前在Facebook人工智能研究(FAIR)实验室担任研究科学家。曾以第一作者身份拿过两次CVPR最佳论文奖(2009年和2016年) - 其中2016年CVPR最佳论文为图像识别中的深度残差学习(对于法师识别的深度残留学习)更是一鸣惊人下面我们就从背景讲深度残差网络:
“传统”深度学习的困难
深度学习目前进展取决于技巧:初始权值选择,局部感受野,权值共享等等,但使用更深层的网络时(例如> 100),依然要面对反向传播时梯度消失这类传统困难,
退化问题:层数越多,训练错误率与测试错误率反而升高。
如何解决上面的问题呢?
深度残差网络的思路
引入“捷径”,可以防止梯度消失问题,其实在ResNet出来以前已经有人对这方面进行研究了,Srivastava等人的“公路网络”,“培训非常深的网络”是类似的方法,他们认为更深层的网络应该也是易于优化的,层数越多准确率越高,训练方法与“传统”深度网络相比不会有很大变化,基于此,何凯明团队提出了一个新的深度残差网络。

上图的左半部分是常规的神经网络形式,每层有权值和激活函数,这里的激活函数采用的是RELU函数,目的是为了避免梯度消失的问题,右半部分是残差网络的基本单元,和左边最大的不同在于多了一个身份即多了一个直接到达输出前的连线,这代表什么意思呢?刚开始输入的X,按照常规的神经网络进行权值叠加然后通过激活函数,在次权值叠加后此时在把输入的信号和此时的输出叠加然后在通过激活函数,就是这样的一个网络,而那条线称为捷径,也称为高速公路,加这个高速公路有什用呢?这个残差怎么理解呢?大家可以这样理解在线性拟合中的残差说的是数据点距离拟合直线的函数值的差,那么这里我们可以类比,这里的X就是我们的拟合的函数,而H(x)的就是具体的数据点,那么我通过训练使的拟合的值加上F(x)的就得到具体数据点的值,因此这 F(x)的就是残差了。

​2.ResNet构建代码

#深度残差网络ResNet--这里包含了 ResNet18 & ResNet34

import tensorflow as tf

from tensorflow.keras import layers,Sequential

from tensorflow import keras as keras


#自定义层

class BasicBlock(layers.Layer):#这是一个小的BasicBlock

    def __init__(self,filter_num,stride):#这里我们定义用到的两个参数filter_num(卷积核个数),stride步长

        super(BasicBlock,self).__init__()

        #unit-1

        self.conv1 = layers.Conv2D(filter_num,(3,3),strides=stride,padding='same')

        '''注意这里的strides是变量,因为strides=stride,而上面的stride=1定义是可以改变的,所以可能直接经过

           conv下采样,而不需要多写一层subsampling/pooling-layer

           与此同时,padding=same在CONV卷积层后尺寸是否不变由步长strides是否大于1决定,因为padding='same'有

           两种方式,当strides=1时,这样前后尺寸大小一致,当strides=2时,前面如果是32*32,那么后面就

           补0-padding成16*16的尺寸,strides=3,前面如果是30*30,后面就是10*10'''

        self.bn1 = layers.BatchNormalization(trainable=True)

        self.relu = layers.Activation('relu')

        #unit-2-这一unit不再做可能的下采样,可能的下采样通过unit1-strides>1实现

        self.conv2 = layers.Conv2D(filter_num, (3, 3), strides=1, padding='same')

        self.bn2 = layers.BatchNormalization(trainable=True)

        #然后我在这里设置shortcut连接线

        if stride != 1:

            #stride不等于1的时候就要将x进行形状变换

            #将32*32-->16*16(stride=2(上下stride,channel不做stride)时)

            self.shortcut = layers.Conv2D(filter_num,(1,1),strides=stride,padding='same')

        else:

            self.shortcut = lambda t:t

            #lambda [arg1 [,arg2,.....argn]]:expression这个函数就一个表达式

            #sum = lambda arg1,arg2: arg1 + arg2 #sum(10,20) #30

    def call(self,inputs,training=True):#定义这两个参数

        #学习网络

        out = self.conv1(inputs)

        out = self.bn1(out)

        out = self.relu(out)

        out = self.conv2(out)

        out = self.bn2(out)

        #shortcut网络

        shortcut = self.shortcut(inputs)

        #相加求和

        output = layers.add([shortcut,out])

        output = tf.nn.relu(output)

        return output

      #自定义网络

class ResNet(keras.Model):

    def __init__(self,sicengdemeicengbasicblockgeshu):#__init__()一定要在最上方

        super(ResNet,self).__init__()

        #预处理层

        self.preprocess = Sequential([

            layers.Conv2D(64,(3,3),strides=(1,1)),

            layers.BatchNormalization(trainable=True),

            layers.Activation('relu'),

            layers.MaxPool2D(pool_size=(2,2),strides=(1,1),padding='same')

        ])

        #接下来是四个ResBlock-units

        self.layer1 = self.build_resblock(64, sicengdemeicengbasicblockgeshu[0])

        self.layer2 = self.build_resblock(128, sicengdemeicengbasicblockgeshu[1])

        self.layer3 = self.build_resblock(256, sicengdemeicengbasicblockgeshu[2])

        self.layer4 = self.build_resblock(512, sicengdemeicengbasicblockgeshu[3])

        #TENSORSHAPE自适应变换层-将[b,H,L,512]当中的[H,L]做平均值计算之后->[b,1,1,512]->[b,512]

        self.avgpool = layers.GlobalAveragePooling2D()

        #全连接层

        self.fc = layers.Dense(100)#输出维度100

    def call(self,inputs,training=True):

        #Fisrt

        out = self.preprocess(inputs)

        #Second

        out = self.layer1(out)

        out = self.layer2(out)

        out = self.layer3(out)

        out = self.layer4(out)

        #[b,512]

        out = self.avgpool(out)

        #[b,100]

        out = self.fc(out)

        return out

    #自定义网络当中还需要先进行自定义一个resblock-unit

    def build_resblock(self,filter_num,BasicBlock_num):#这些参数都是下面函数内容需要的

        #建立容器之后再往里面添加层

        resblock = Sequential()

        resblock.add(BasicBlock(filter_num,stride=2))#类的API要填写__init__()初始化类函数当中的参数

        #前面在BasicBlock内部只做一层下采样,这里我规定在一个ResBlock当中,仅仅第一个BasicBlock做下采样

        for i in range(1,BasicBlock_num):

            resblock.add(BasicBlock(filter_num,stride=1))

        return resblock

#ResNet18实现

def ResNet18():

    return ResNet([2,2,2,2])#每个ResBlock包括两个BasicBlock,一共四个ResBlock


#ResNet34实现

def ResNet34():

    return ResNet([3,4,6,3])

3.训练和测试代码

import tensorflow as tf

from tensorflow.keras import optimizers,datasets

import os

from qaz import ResNet18


os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

tf.random.set_seed(1234)


def preprocess(x,y):

    #tf.cast(x,dtype=tf.float32)/255.是转换到[0,1]然后×2-1转换到[-1,1]

    x = 2*(tf.cast(x,dtype=tf.float32)/255.)-1

    y = tf.cast(y,dtype=tf.int32)

    return x,y


(x,y),(x_test,y_test)=datasets.cifar100.load_data()

y = tf.squeeze(y,axis=1)#[b]

y_test = tf.squeeze(y_test,axis=1)#[b]

db_train = tf.data.Dataset.from_tensor_slices((x,y))

db_train = db_train.map(preprocess).shuffle(50000).batch(64)#shuffle后面是buffer-存放缓冲区

db_test = tf.data.Dataset.from_tensor_slices((x_test,y_test))

db_test = db_test.map(preprocess).shuffle(10000).batch(64)

sample=next(iter(db_train))

sample_test=next(iter(db_test))

print(sample_test[0].shape,sample_test[1].shape)


def main():

    model = ResNet18()

    model.build(input_shape=(None,32,32,3))

    model.summary()

    optimizer = optimizers.Adam(lr=3e-4)

    for epoch in range(20):

        #训练数据

        for step,(x,y) in enumerate(db_train):

            with tf.GradientTape() as tape:

                #logits[b,100]

                logits = model(x)

                #[b]->[b,100]

                y_onehot = tf.one_hot(y,depth=100)

                loss = tf.losses.categorical_crossentropy(y_onehot,logits,from_logits=True)

                loss = tf.reduce_mean(loss)

            grads = tape.gradient(loss,model.trainable_variables)

            optimizer.apply_gradients(zip(grads,model.trainable_variables))

            if step % 100 == 0:

                print(epoch,step,'loss:',loss.numpy())

        #测试模型

        total_num = 0

        total_correct = 0

        for _,(x,y) in enumerate(db_test):#这里都是一个batch一个batch的做测试,所以有step的概念存在

            #logits[b,100]

            logits = model(x)

            prob = tf.nn.softmax(logits,axis=1)

            pred = tf.argmax(prob,axis=1)

            pred = tf.cast(pred,dtype=tf.int32)#pred[b]

            correct = tf.cast(tf.equal(y,pred),dtype=tf.int32)

            correct = tf.reduce_sum(correct)

            total_num += x.shape[0]#每个batch的x.shape[0]不一样

            total_correct += int(correct)

        acc = total_correct / total_num

        #打印每个epoch的test-set的acc结果

        print(epoch,'acc:',acc)


if __name__ == '__main__':

    main()

4.运行结果

使用1080Ti—11G在经过100次训练之后,在一次测试集上达到0.7129的准确率ACC。

GPU测试温度:65度

GPU占用率:35.7%—64batch


文章转载自快来自习室,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论