文章主要使用MXnet框架,主要来源:mxnet官网API。
# Uncomment the following line if matplotlib is not installed.
# !pip install matplotlib
from mxnet import nd, gluon, init, autograd
from mxnet.gluon import nn
from mxnet.gluon.data.vision import datasets, transforms
from IPython import display
import matplotlib.pyplot as plt
import time
#获得数据
mnist_train = datasets.FashionMNIST(train=True)
X, y = mnist_train[0]
#('X shape: ', X.shape, 'X dtype', X.dtype, 'y:', y)
#展示数据
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat','sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
X, y = mnist_train[0:10]
# plot images
display.set_matplotlib_formats('svg')
_, figs = plt.subplots(1, X.shape[0], figsize=(15, 15))
for f,x,yi in zip(figs, X,y):
# 3D->2D by removing the last channel dim
f.imshow(x.reshape((28,28)).asnumpy())
ax = f.axes
ax.set_title(text_labels[int(yi)])
ax.title.set_fontsize(14)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
#数据增强, transforms.ToTensor()主要是将图片格式转换为(channel,height,width)
transformer = transforms.Compose([transforms.ToTensor(), transforms.Normalize(0.13, 0.31)])
mnist_train = mnist_train.transform_first(transformer)
#每次加载一个batch
batch_size = 256
train_data = gluon.data.DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=4)
for data, label in train_data:
print(data.shape, label.shape)
break
#创建验证数据
mnist_valid = gluon.data.vision.FashionMNIST(train=False)
valid_data = gluon.data.DataLoader(mnist_valid.transform_first(transformer), batch_size=batch_size, num_workers=4)
#定义模型
net = nn.Sequential()
net.add(nn.Conv2D(channels=6, kernel_size=5, activation='relu'),
nn.MaxPool2D(pool_size=2, strides=2),
nn.Conv2D(channels=16, kernel_size=3, activation='relu'),
nn.MaxPool2D(pool_size=2, strides=2),
nn.Flatten(),
nn.Dense(120, activation="relu"),
nn.Dense(84, activation="relu"),
nn.Dense(10))
net.initialize(init=init.Xavier())
#定义损失函数及优化参数
softmax_cross_entropy = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.1})
#准确率
def acc(output, label):
# output: (batch, num_output) float32 ndarray
# label: (batch, ) int32 ndarray
return (output.argmax(axis=1) ==
label.astype('float32')).mean().asscalar()
for epoch in range(10):
train_loss, train_acc, valid_acc = 0., 0., 0.
tic = time.time()
for data, label in train_data:
# forward + backward
with autograd.record():
output = net(data)
loss = softmax_cross_entropy(output, label)
loss.backward()
# update parameters
trainer.step(batch_size)
# calculate training metrics
train_loss += loss.mean().asscalar()
train_acc += acc(output, label)
# calculate validation accuracy
for data, label in valid_data:
valid_acc += acc(net(data), label)
print("Epoch %d: loss %.3f, train acc %.3f, test acc %.3f, in %.1f sec" % (
epoch, train_loss/len(train_data), train_acc/len(train_data),
valid_acc/len(valid_data), time.time()-tic))
#保存模型
net.save_parameters('net.params')
#使用预训练网络预测
from mxnet import nd
from mxnet import gluon
from mxnet.gluon import nn
from mxnet.gluon.data.vision import datasets, transforms
from IPython import display
import matplotlib.pyplot as plt
net = nn.Sequential()
net.add(nn.Conv2D(channels=6, kernel_size=5, activation='relu'),
nn.MaxPool2D(pool_size=2, strides=2),
nn.Conv2D(channels=16, kernel_size=3, activation='relu'),
nn.MaxPool2D(pool_size=2, strides=2),
nn.Flatten(),
nn.Dense(120, activation="relu"),
nn.Dense(84, activation="relu"),
nn.Dense(10))
net.load_parameters('net.params')
transformer = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(0.13, 0.31)])
mnist_valid = datasets.FashionMNIST(train=False)
X, y = mnist_valid[:10]
preds = []
for x in X:
x = transformer(x).expand_dims(axis=0)
pred = net(x).argmax(axis=1)
preds.append(pred.astype('int32').asscalar())
_, figs = plt.subplots(1, 10, figsize=(15, 15))
text_labels = ['t-shirt', 'trouser', 'pullover', 'dress', 'coat',
'sandal', 'shirt', 'sneaker', 'bag', 'ankle boot']
display.set_matplotlib_formats('svg')
for f,x,yi,pyi in zip(figs, X, y, preds):
f.imshow(x.reshape((28,28)).asnumpy())
ax = f.axes
ax.set_title(text_labels[yi]+'\n'+text_labels[pyi])
ax.title.set_fontsize(14)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
#使用Gluon model zoo预训练网络模型来预测
from mxnet.gluon.model_zoo import vision as models
from mxnet.gluon.utils import download
from mxnet import image
net = models.resnet50_v2(pretrained=True)
url = 'http://data.mxnet.io/models/imagenet/synset.txt'
fname = download(url)
with open(fname, 'r') as f:
text_labels = [' '.join(l.split()[1:]) for l in f]
url = 'https://upload.wikimedia.org/wikipedia/commons/thumb/b/b5/\
Golden_Retriever_medium-to-light-coat.jpg/\
365px-Golden_Retriever_medium-to-light-coat.jpg'
fname = download(url)
x = image.imread(fname)
x = image.resize_short(x, 256)
x, _ = image.center_crop(x, (224,224))
plt.imshow(x.asnumpy())
plt.show()
def transform(data):
data = data.transpose((2,0,1)).expand_dims(axis=0)
rgb_mean = nd.array([0.485, 0.456, 0.406]).reshape((1,3,1,1))
rgb_std = nd.array([0.229, 0.224, 0.225]).reshape((1,3,1,1))
return (data.astype('float32') / 255 - rgb_mean) / rgb_std
prob = net(transform(x)).softmax()
idx = prob.topk(k=5)[0]
for i in idx:
i = int(i.asscalar())
print('With prob = %.5f, it contains %s' % (
prob[0,i].asscalar(), text_labels[i]))
#在GPU上使用
net = nn.Sequential()
net.add(nn.Conv2D(channels=6, kernel_size=5, activation='relu'),
nn.MaxPool2D(pool_size=2, strides=2),
nn.Conv2D(channels=16, kernel_size=3, activation='relu'),
nn.MaxPool2D(pool_size=2, strides=2),
nn.Flatten(),
nn.Dense(120, activation="relu"),
nn.Dense(84, activation="relu"),
nn.Dense(10))
net.load_parameters('net.params', ctx=gpu(0))
x = nd.random.uniform(shape=(1,1,28,28), ctx=gpu(0))
net(x)
#在多GPU上使用
batch_size = 256
transformer = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(0.13, 0.31)])
train_data = gluon.data.DataLoader(
datasets.FashionMNIST(train=True).transform_first(transformer),
batch_size, shuffle=True, num_workers=4)
valid_data = gluon.data.DataLoader(
datasets.FashionMNIST(train=False).transform_first(transformer),
batch_size, shuffle=False, num_workers=4)
# Diff 1: Use two GPUs for training.
devices = [gpu(0), gpu(1)]
# Diff 2: reinitialize the parameters and place them on multiple GPUs
net.collect_params().initialize(force_reinit=True, ctx=devices)
# Loss and trainer are the same as before
softmax_cross_entropy = gluon.loss.SoftmaxCrossEntropyLoss()
trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.1})
for epoch in range(10):
train_loss = 0.
tic = time.time()
for data, label in train_data:
# Diff 3: split batch and load into corresponding devices
data_list = gluon.utils.split_and_load(data, devices)
label_list = gluon.utils.split_and_load(label, devices)
# Diff 4: run forward and backward on each devices.
# MXNet will automatically run them in parallel
with autograd.record():
losses = [softmax_cross_entropy(net(X), y)
for X, y in zip(data_list, label_list)]
for l in losses:
l.backward()
trainer.step(batch_size)
# Diff 5: sum losses over all devices
train_loss += sum([l.sum().asscalar() for l in losses])
print("Epoch %d: loss %.3f, in %.1f sec" % (
epoch, train_loss/len(train_data)/batch_size, time.time()-tic))
手写识别体
import mxnet as mx
mnist = mx.test_utils.get_mnist()
batch_size = 100
train_iter = mx.io.NDArrayIter(mnist['train_data'], mnist['train_label'], batch_size, shuffle=True)
val_iter = mx.io.NDArrayIter(mnist['test_data'], mnist['test_label'], batch_size)
data = mx.sym.var('data')
# Flatten the data from 4-D shape into 2-D (batch_size, num_channel*width*height)
data = mx.sym.flatten(data=data)
# The first fully-connected layer and the corresponding activation function
fc1 = mx.sym.FullyConnected(data=data, num_hidden=128)
act1 = mx.sym.Activation(data=fc1, act_type="relu")
# The second fully-connected layer and the corresponding activation function
fc2 = mx.sym.FullyConnected(data=act1, num_hidden = 64)
act2 = mx.sym.Activation(data=fc2, act_type="relu")
# MNIST has 10 classes
fc3 = mx.sym.FullyConnected(data=act2, num_hidden=10)
# Softmax with cross entropy loss
mlp = mx.sym.SoftmaxOutput(data=fc3, name='softmax')
import logging
logging.getLogger().setLevel(logging.DEBUG) # logging to stdout
# create a trainable module on CPU
mlp_model = mx.mod.Module(symbol=mlp, context=mx.cpu())
mlp_model.fit(train_iter, # train data
eval_data=val_iter, # validation data
optimizer='sgd', # use SGD to train
optimizer_params={'learning_rate':0.1}, # use fixed learning rate
eval_metric='acc', # report accuracy during training
batch_end_callback = mx.callback.Speedometer(batch_size, 100), # output progress for each 100 data batches
num_epoch=20) # train for at most 10 dataset passes
test_iter = mx.io.NDArrayIter(mnist['test_data'], None, batch_size)
prob = mlp_model.predict(test_iter)
assert prob.shape == (10000, 10)
test_iter = mx.io.NDArrayIter(mnist['test_data'], mnist['test_label'], batch_size)
# predict accuracy of mlp
acc = mx.metric.Accuracy()
mlp_model.score(test_iter, acc)
print(acc)
assert acc.get()[1] > 0.96
import mxnet as mx
mnist = mx.test_utils.get_mnist()
batch_size = 100
train_iter = mx.io.NDArrayIter(mnist['train_data'], mnist['train_label'], batch_size, shuffle=True)
val_iter = mx.io.NDArrayIter(mnist['test_data'], mnist['test_label'], batch_size)
data = mx.sym.var('data')
# first conv layer
conv1 = mx.sym.Convolution(data=data, kernel=(5,5), num_filter=20)
tanh1 = mx.sym.Activation(data=conv1, act_type="tanh")
pool1 = mx.sym.Pooling(data=tanh1, pool_type="max", kernel=(2,2), stride=(2,2))
# second conv layer
conv2 = mx.sym.Convolution(data=pool1, kernel=(5,5), num_filter=50)
tanh2 = mx.sym.Activation(data=conv2, act_type="tanh")
pool2 = mx.sym.Pooling(data=tanh2, pool_type="max", kernel=(2,2), stride=(2,2))
# first fullc layer
flatten = mx.sym.flatten(data=pool2)
fc1 = mx.symbol.FullyConnected(data=flatten, num_hidden=500)
tanh3 = mx.sym.Activation(data=fc1, act_type="tanh")
# second fullc
fc2 = mx.sym.FullyConnected(data=tanh3, num_hidden=10)
# softmax loss
lenet = mx.sym.SoftmaxOutput(data=fc2, name='softmax')
# create a trainable module on GPU 0
lenet_model = mx.mod.Module(symbol=lenet, context=mx.cpu())
# train with the same
lenet_model.fit(train_iter,
eval_data=val_iter,
optimizer='sgd',
optimizer_params={'learning_rate':0.1},
eval_metric='acc',
batch_end_callback = mx.callback.Speedometer(batch_size, 100),
num_epoch=10)
test_iter = mx.io.NDArrayIter(mnist['test_data'], None, batch_size)
prob = lenet_model.predict(test_iter)
test_iter = mx.io.NDArrayIter(mnist['test_data'], mnist['test_label'], batch_size)
# predict accuracy for lenet
acc = mx.metric.Accuracy()
lenet_model.score(test_iter, acc)
print(acc)
assert acc.get()[1] > 0.98
评论(0)
您还未登录,请登录后发表或查看评论