码字不易,欢迎给个赞!

欢迎交流与转载,文章会同步发布在公众号:机器学习算法工程师(Jeemy110)


TensorFlow虽是深度学习领域最广泛使用的框架,但是对比PyTorch这一动态图框架,采用静态图(Graph模式)的TensorFlow确实是难用。好在最近TensorFlow支持了eager模式,对标PyTorch的动态执行机制。更进一步地,Google在最近推出了全新的版本TensorFlow 2.0,2.0版本相比1.0版本不是简单地更新,而是一次重大升级(虽然目前只发布了preview版本)。简单地来说,TensorFlow 2.0默认采用eager执行模式,而且重整了很多混乱的模块。毫无疑问,2.0版本将会逐渐替换1.0版本,所以很有必要趁早入手TensorFlow 2.0。这篇文章将简明扼要地介绍TensorFlow 2.0,以求快速入门。

Eager执行

TensorFlow的Eager执行时一种命令式编程(imperative programming),这和原生Python是一致的,当你执行某个操作时是立即返回结果的。而TensorFlow一直是采用Graph模式,即先构建一个计算图,然后需要开启Session,喂进实际的数据才真正执行得到结果。显然,eager执行更简洁,我们可以更容易debug自己的代码,这也是为什么PyTorch更简单好用的原因。一个简单的例子如下:

x = tf.ones((2, 2), dtype=tf.dtypes.float32)
y = tf.constant([[1, 2],
                 [3, 4]], dtype=tf.dtypes.float32)
z = tf.matmul(x, y)
print(z)
# tf.Tensor(
# [[4. 6.]
#  [4. 6.]], shape=(2, 2), dtype=float32)

print(z.numpy())
# [[4. 6.]
# [4. 6.]]

可以看到在eager执行下,每个操作后的返回值是tf.Tensor,其包含具体值,不再像Graph模式下那样只是一个计算图节点的符号句柄。由于可以立即看到结果,这非常有助于程序debug。更进一步地,调用tf.Tensor.numpy()方法可以获得Tensor所对应的numpy数组。

这种eager执行的另外一个好处是可以使用Python原生功能,比如下面的条件判断:

random_value = tf.random.uniform([], 0, 1)
x = tf.reshape(tf.range(0, 4), [2, 2])
print(random_value)
if random_value.numpy() > 0.5:
    y = tf.matmul(x, x)
else:
    y = tf.add(x, x)

这种动态控制流主要得益于eager执行得到Tensor可以取出numpy值,这避免了使用Graph模式下的tf.cond和tf.while等算子。

另外一个重要的问题,在egaer模式下如何计算梯度。在Graph模式时,我们在构建模型前向图时,同时也会构建梯度图,这样实际喂数据执行时可以很方便计算梯度。但是eager执行是动态的,这就需要每一次执行都要记录这些操作以计算梯度,这是通过tf.GradientTape来追踪所执行的操作以计算梯度,下面是一个计算实例:

w = tf.Variable([[1.0]])
with tf.GradientTape() as tape:
  loss = w * w + 2. * w + 5.

grad = tape.gradient(loss, w)
print(grad)  # => tf.Tensor([[ 4.]], shape=(1, 1), dtype=float32)

对于eager执行,每个tape会记录当前所执行的操作,这个tape只对当前计算有效,并计算相应的梯度。PyTorch也是动态图模式,但是与TensorFlow不同,它是每个需要计算Tensor会拥有grad_fn以追踪历史操作的梯度。

TensorFlow 2.0引入的eager提高了代码的简洁性,而且更容易debug。但是对于性能来说,eager执行相比Graph模式会有一定的损失。这不难理解,毕竟原生的Graph模式是先构建好静态图,然后才真正执行。这对于 在分布式训练、性能优化和生产部署方面具有优势。但是好在,TensorFlow 2.0引入了tf.function和AutoGraph来缩小eager执行和Graph模式的性能差距,其核心是将一系列的Python语法转化为高性能的graph操作。

AutoGraph

AutoGraph在TensorFlow 1.x已经推出,主要是可以将一些常用的Python代码转化为TensorFlow支持的Graph代码。一个典型的例子是在TensorFlow中我们必须使用tf.while和tf.cond等复杂的算子来实现动态流程控制,但是现在我们可以使用Python原生的for和if等语法写代码,然后采用AutoGraph转化为TensorFlow所支持的代码,如下面的例子:

def square_if_positive(x):
    if x > 0:
        x = x * x
    else:
        x = 0.0
    return x

# eager 模式
print('Eager results: %2.2f, %2.2f' % (square_if_positive(tf.constant(9.0)),
                                       square_if_positive(tf.constant(-9.0))))

# graph 模式
tf_square_if_positive = tf.autograph.to_graph(square_if_positive)

with tf.Graph().as_default():
  # The result works like a regular op: takes tensors in, returns tensors.
  # You can inspect the graph using tf.get_default_graph().as_graph_def()
    g_out1 = tf_square_if_positive(tf.constant( 9.0))
    g_out2 = tf_square_if_positive(tf.constant(-9.0))
    with tf.compat.v1.Session() as sess:
        print('Graph results: %2.2f, %2.2f\n' % (sess.run(g_out1), sess.run(g_out2)))

上面我们定义了一个square_if_positive函数,它内部使用的Python的原生的if语法,对于TensorFlow 2.0的eager执行,这是没有问题的。然而这是TensorFlow 1.x所不支持的,但是使用AutoGraph可以将这个函数转为Graph函数,你可以将其看成一个常规TensorFlow op,其可以在Graph模式下运行(tf2 没有Session,这是tf1.x的特性,想使用tf1.x的话需要调用tf.compat.v1)。大家要注意eager模式和Graph模式的差异,尽管结果是一样的,但是Graph模式更高效。 从本质上讲,AutoGraph是将Python代码转为TensorFlow原生的代码,我们可以进一步看到转化后的代码:

print(tf.autograph.to_code(square_if_positive))
#################################################
from __future__ import print_function

def tf__square_if_positive(x):
  try:
    with ag__.function_scope('square_if_positive'):
      do_return = False
      retval_ = None
      cond = ag__.gt(x, 0)

      def if_true():
        with ag__.function_scope('if_true'):
          x_1, = x,
          x_1 = x_1 * x_1
          return x_1

      def if_false():
        with ag__.function_scope('if_false'):
          x = 0.0
          return x
      x = ag__.if_stmt(cond, if_true, if_false)
      do_return = True
      retval_ = x
      return retval_
  except:
    ag__.rewrite_graph_construction_error(ag_source_map__)



tf__square_if_positive.autograph_info__ = {}

可以看到AutoGraph转化的代码定义了两个条件函数,然后调用if_stmt op,应该就是类似tf.cond的op。 AutoGraph支持很多Python特性,比如循环:

def sum_even(items):
    s = 0
    for c in items:
        if c % 2 > 0:
            continue
        s += c
    return s

print('Eager result: %d' % sum_even(tf.constant([10,12,15,20])))

tf_sum_even = tf.autograph.to_graph(sum_even)

with tf.Graph().as_default(), tf.compat.v1.Session() as sess:
    print('Graph result: %d\n\n' % sess.run(tf_sum_even(tf.constant([10,12,15,20]))))

对于大部分Python特性AutoGraph是支持的,但是其仍然有限制,具体可以见Capabilities and Limitations

此外,要注意的一点是,经过AutoGraph转换的新函数是可以eager模式下执行的,但是性能却并不会比转换前的高,你可以对比:

x = tf.constant([10, 12, 15, 20])
print("Eager at orginal code:", timeit.timeit(lambda: sum_even(x), number=100))
print("Eager at autograph code:", timeit.timeit(lambda: tf_sum_even(x), number=100))

with tf.Graph().as_default(), tf.compat.v1.Session() as sess:
    graph_op = tf_sum_even(tf.constant([10, 12, 15, 20]))
    sess.run(graph_op)  # remove first call
    print("Graph at autograph code:", timeit.timeit(lambda: sess.run(graph_op), number=100))
##########################################
Eager at orginal code: 0.05176109499999981
Eager at autograph code: 0.11203173799999977
Graph at autograph code: 0.03418808900000059

从结果上看,Graph模式下的执行效率是最高的,原来的代码在eager模式下效率次之,经AutoGraph转换后的代码效率最低。

所以,在TensorFlow 2.0,我们一般不会直接使用tf.autograph,因为eager执行下效率没有提升。要真正达到Graph模式下的效率,要依赖tf.function这个更强大的利器。

性能优化:tf.function

尽管eager执行更简洁,但是Graph模式却是性能更高,为了减少这个性能gap,TensorFlow 2.0引入了tf.function,先给出官方对tf.function的说明:

function constructs a callable that executes a TensorFlow graph (tf.Graph) created by tracing the TensorFlow operations in func. This allows the TensorFlow runtime to apply optimizations and exploit parallelism in the computation defined by func.

简单来说,就是tf.function可以将一个func中的TensorFlow操作构建为一个Graph,这样在调用时是执行这个Graph,这样计算性能更优。比如下面的例子:

def f(x, y):
    print(x, y)
    return tf.reduce_mean(tf.multiply(x ** 2, 3) + y)

g = tf.function(f)

x = tf.constant([[2.0, 3.0]])
y = tf.constant([[3.0, -2.0]])

# `f` and `g` will return the same value, but `g` will be executed as a
# TensorFlow graph.
assert f(x, y).numpy() == g(x, y).numpy()
# tf.Tensor([[2. 3.]], shape=(1, 2), dtype=float32) tf.Tensor([[ 3. -2.]], shape=(1, 2), dtype=float32)
# Tensor("x:0", shape=(1, 2), dtype=float32) Tensor("y:0", shape=(1, 2), dtype=float32)

如上面的例子,被tf.function装饰的函数将以Graph模式执行,可以把它想象一个封装了Graph的TF op,直接调用它也会立即得到Tensor结果,但是其内部是高效执行的。我们在内部打印Tensor时,eager执行会直接打印Tensor的值,而Graph模式打印的是Tensor句柄,其无法调用numpy方法取出值,这和TF 1.x的Graph模式是一致的。 由于tf.function装饰的函数是Graph执行,其执行速度一般要比eager模式要快,当Graph包含很多小操作时差距更明显,可以比较下卷积和LSTM的性能差距:

import timeit
conv_layer = tf.keras.layers.Conv2D(100, 3)

@tf.function
def conv_fn(image):
  return conv_layer(image)

image = tf.zeros([1, 200, 200, 100])
# warm up
conv_layer(image); conv_fn(image)
print("Eager conv:", timeit.timeit(lambda: conv_layer(image), number=10))
print("Function conv:", timeit.timeit(lambda: conv_fn(image), number=10))
# 单纯的卷积差距不是很大
# Eager conv: 0.44013839924952197
# Function conv: 0.3700763391782858

lstm_cell = tf.keras.layers.LSTMCell(10)

@tf.function
def lstm_fn(input, state):
  return lstm_cell(input, state)

input = tf.zeros([10, 10])
state = [tf.zeros([10, 10])] * 2
# warm up
lstm_cell(input, state); lstm_fn(input, state)
print("eager lstm:", timeit.timeit(lambda: lstm_cell(input, state), number=10))
print("function lstm:", timeit.timeit(lambda: lstm_fn(input, state), number=10))
# 对于LSTM比较heavy的计算,Graph执行要快很多
# eager lstm: 0.025562446062237565
# function lstm: 0.0035498656569271647

要想灵活使用tf.function,必须深入理解它背后的机理,这里简单地谈一下。在TF 1.x时,首先要创建静态计算图,然后新建Session真正执行不同的运算:

import tensorflow as tf

x = tf.placeholder(tf.float32)
y = tf.square(x)
z = tf.add(x, y)

sess = tf.Session()

z0 = sess.run([z], feed_dict={x: 2.})        # 6.0
z1 = sess.run([z], feed_dict={x: 2., y: 2.}) # 4.0

尽管上面只定义了一个graph,但是两次不同的sess执行(运行时)其实是执行两个不同的程序或者说subgraph:

def compute_z0(x):
  return tf.add(x, tf.square(x))

def compute_z1(x, y):
  return tf.add(x,  y)

这里我们将两个不同的subgraph封装到了两个python函数中。更进一步地,我们可以不再需要Session,当执行这两个函数时,直接调用对应的计算图就可以,这就是tf.function的功效:

import tensorflow as tf

@tf.function
def compute_z1(x, y):
  return tf.add(x, y)

@tf.function
def compute_z0(x):
  return compute_z1(x, tf.square(x))

z0 = compute_z0(2.)
z1 = compute_z1(2., 2.)

可以说tf.function内部管理了一系列Graph,并控制了Graph的执行。另外一个问题时,虽然函数内部定义了一系列的操作,但是对于不同的输入,是需要不同的计算图。如函数的输入Tensor的shape或者dtype不同,那么计算图是不同的,好在tf.function支持这种多态性(polymorphism)

# Functions are polymorphic

@tf.function
def double(a):
  print("Tracing with", a)
  return a + a

print(double(tf.constant(1)))
print(double(tf.constant(1.1)))
print(double(tf.constant([1, 2])))

# Tracing with Tensor("a:0", shape=(), dtype=int32)
# tf.Tensor(2, shape=(), dtype=int32)
# Tracing with Tensor("a:0", shape=(), dtype=float32)
# tf.Tensor(2.2, shape=(), dtype=float32)
# Tracing with Tensor("a:0", shape=(2,), dtype=int32)
# tf.Tensor([2 4], shape=(2,), dtype=int32)

注意函数内部的打印,当输入tensor的shape或者类型发生变化,打印的东西也是相应改变。所以,它们的计算图(静态的)并不一样。tf.function这种多态特性其实是背后追踪了(tracing)不同的计算图。具体来说,被tf.function装饰的函数f接受一定的Tensors,并返回0到任意到Tensor,当装饰后的函数F被执行时:

  • 根据输入Tensors的shape和dtypes确定一个"trace_cache_key";
  • 每个"trace_cache_key"映射了一个Graph,当新的"trace_cache_key"要建立时,f将构建一个新的Graph,若"trace_cache_key"已经存在,那么直需要从缓存中查找已有的Graph即可;
  • 将输入Tensors喂进这个Graph,然后执行得到输出Tensors。

这种多态性是我们需要的,因为有时候我们希望输入不同shape或者dtype的Tensors,但是当"trace_cache_key"越来越多时,意味着你要cache了庞大的Graph,这点是要注意的。另外,tf.function提供了input_signature,这个参数采用tf.TensorSpec指定了输入到函数的Tensor的shape和dtypes,如下面的例子:

@tf.function(input_signature=[tf.TensorSpec(shape=None, dtype=tf.float32)])
def f(x):
    return tf.add(x, 1.)
print(f(tf.constant(1.0)))  # tf.Tensor(2.0, shape=(), dtype=float32)
print(f(tf.constant([1.0,]))) # tf.Tensor([2.], shape=(1,), dtype=float32)
print(f(tf.constant([1])))  # ValueError: Python inputs incompatible with input_signature

此时,输入Tensor的dtype必须是float32,但是shape不限制,当类型不匹配时会出错。

tf.function的另外一个参数是autograph,默认是True,意思是在构建Graph时将自动使用AutoGraph,这样你可以在函数内部使用Python原生的条件判断以及循环语句,因为它们会被tf.cond和tf.while_loop转化为Graph代码。注意的一点是判断分支和循环必须依赖于Tensors才会被转化,当autograph为False时,如果存在判断分支和循环必须依赖于Tensors的情况将会出错。如下面的例子:

def sum_even(items):
  s = 0
  for c in items:
    if c % 2 > 0:
      continue
    s += c
  return s

sum_even_autograph_on = tf.function(sum_even, autograph=True)
sum_even_autograph_off = tf.function(sum_even, autograph=False)
x = tf.constant([10, 12, 15, 20])

sum_even(x) # OK 
sum_even_autograph_on(x) # OK
sum_even_autograph_off(x) # TypeError: Tensor objects are only iterable when eager execution is enabled

很容易理解,应用tf.function之后是Graph模式,Tensors是不能被遍历的,但是采用AutoGraph可以将其转换为Graph代码,所以可以成功。大部分情况,我们还是默认开启autograph。

最要的是tf.function可以应用到类方法中,并且可以引用tf.Variable,可以看下面的例子:

class ScalarModel(object):
  def __init__(self):
    self.v = tf.Variable(0)

  @tf.function
  def increment(self, amount):
    self.v.assign_add(amount)

model1 = ScalarModel()
model1.increment(tf.constant(3))
assert int(model1.v) == 3
model1.increment(tf.constant(4))
assert int(model1.v) == 7
model2 = ScalarModel()  # model1和model2 拥有不同变量
model2.increment(tf.constant(5))
assert int(model2.v) == 5

后面会讲到,这个特性可以应用到tf.Keras的模型构建中。上面这个例子还有一点,就是可以在function中使用tf.assign这类具有副作用(改变Variable的值)的操作,这对于模型训练比较重要。

前面说过,python原生的print函数只会在构建Graph时打印一次Tensor句柄。如果想要打印Tensor的具体值,要使用tf.print:

@tf.function
def print_element(items):
    for c in items:
      tf.print(c)

x = tf.constant([1, 5, 6, 8, 3])
print_element(x)

这里就对tf.function做这些介绍,但是实际上其还有更多复杂的使用须知,详情可以参考TensorFlow 2.0: Functions, not Sessions

模型构建:tf.keras

TensorFlow 2.0全面keras化:如果你想使用高级的layers,只能选择keras。TensorFlow 1.x存在tf.layers以及tf.contrib.slim等高级API来创建模型,但是2.0仅仅支持tf.keras.layers,不管怎么样,省的大家重复造轮子,也意味着模型构建的部分大家都是统一的,增加代码的复用性(回忆一下原来的TensorFlow模型构建真是千奇百怪)。值得注意的tf.nn模块依然存在,里面是各种常用的nn算子,不过大部分人不会去直接用这些算子构建模型,因为keras.layers基本上包含了常用的网络层。当然,如果想构建新的layer,可以直接继承tf.keras.layers.Layer:

class Linear(tf.keras.layers.Layer):

    def __init__(self, units=32, **kwargs):
        super(Linear, self).__init__(**kwargs)
        self.units = units

    def build(self, input_shape):
        self.w = self.add_weight(shape=(input_shape[-1], self.units),
                             initializer='random_normal',
                             trainable=True)
        self.b = self.add_weight(shape=(self.units,),
                             initializer='random_normal',
                             trainable=True)

    def call(self, inputs):
        return tf.matmul(inputs, self.w) + self.b

layer = Linear(32)
print(layer.weights)  # [] the weights have not created
x = tf.ones((8, 16))
y = layer(x)  # shape [8, 32]
print(layer.weights)

这里我们继承了Layer来实现自定义layer。第一个要注意的点是我们定义了build方法,其主要用于根据input_shape创建layer的Variables。注意,我们没有在类构造函数中创建Variables,而是单独定义了一个方法。之所以这样做类的构造函数中并没有传入输入Tensor的信息,这里需要的是input的输入特征维度,所以无法创建Variables。这个build方法会在layer第一次真正执行(执行layer(input))时才会执行,并且只会执行一次(Layer内部有self.build这个bool属性)。这是一种懒惰执行机制,如果熟悉Pytorch的话,PyTorch在创建layer时是需要输入Tensor的信息,这意味着它是立即创建了Variables。 第二点是Layer本身有很多属性和方,这里列出一些重要的:

  • add_weight方法:用于创建layer的weights(不用直接调用tf.Variale);
  • add_loss方法:顾名思义,用于添加loss,增加的loss可以通过layer.losses属性获得,你可以在call方法中使用该方法添加你想要的loss;
  • add_metric方法:添加metric到layer;
  • losses属性:通过add_loss方法添加loss的list集合,比如一部分layer的正则化loss可以通过这个属性获得;
  • trainable_weights属性:可训练的Variables列表,在模型训练时需要这个属性;
  • non_trainable_weights属性:不可训练的Variables列表;
  • weights属性:trainable_weights和non_trainable_weights的合集;
  • trainable属性:可变动的bool值,决定layer是否可以训练。

Layer类是keras中最基本的类,对其有个全面的认识比较重要,具体可以看源码。大部分情况下,我们只会复用keras已有的layers,而我们创建模型最常用的是keras.Model类,这个Model类是继承了Layer类,但是提供了更多的API,如model.compile(), model.fit(), model.evaluate(), model.predict()等,熟悉keras的都知道这是用于模型训练,评估和预测的方法。另外重要的一点,我们可以继承Model类,创建包含多layers的模块或者模型:

class ConvBlock(tf.keras.Model):
    """Convolutional Block consisting of (conv->bn->relu).
    Arguments:
      num_filters: number of filters passed to a convolutional layer.
      kernel_size: the size of convolution kernel
      weight_decay: weight decay
      dropout_rate: dropout rate.
    """

    def __init__(self, num_filters, kernel_size,
                 weight_decay=1e-4, dropout_rate=0.):
        super(ConvBlock, self).__init__()

        self.conv = tf.keras.layers.Conv2D(num_filters,
                                          kernel_size,
                                          padding="same",
                                          use_bias=False,
                                          kernel_initializer="he_normal",
                                          kernel_regularizer=tf.keras.regularizers.l2(weight_decay))
        self.bn = tf.keras.layers.BatchNormalization()
        self.dropout = tf.keras.layers.Dropout(dropout_rate)


    def call(self, x, training=True):
        output = self.conv(x)
        output = self.bn(x, training=training)
        output = tf.nn.relu(output)
        output = self.dropout(output, training=training)
        return output


model = ConvBlock(32, 3, 1e-4, 0.5)
x = tf.ones((4, 224, 224, 3))
y = model(x)
print(model.layers)

这里我们构建了一个包含Conv2D->BatchNorm->ReLU的block,打印model.layers可以获得其内部包含的所有layers。更进一步地,我们可以在复用这些block就像使用tf.keras.layers一样构建更复杂的模块:

class SimpleCNN(tf.keras.Model):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()

        self.block1 = ConvBlock(16, 3)
        self.block2 = ConvBlock(32, 3)
        self.block3 = ConvBlock(64, 3)

        self.global_pool = tf.keras.layers.GlobalAveragePooling2D()
        self.classifier = tf.keras.layers.Dense(num_classes)

    def call(self, x, training=True):
        output = self.block1(x, training=training)
        output = self.block2(output, training=training)
        output = self.block3(output, training=training)
        output = self.global_pool(output)
        logits = self.classifier(output)
        return logits

model = SimpleCNN(10)
print(model.layers)
x = tf.ones((4, 32, 32, 3))
y = model(x) # [4, 10]

这种使用手法和PyTorch的Module是类似的,并且Model类的大部分属性会递归地收集内部layers的属性,比如model.weights是模型内所有layers中定义的weights。

构建模型的另外方式还可以采用Keras原有方式,如采用tf.keras.Sequential:

model = tf.keras.Sequential([
# Adds a densely-connected layer with 64 units to the model:
layers.Dense(64, activation='relu', input_shape=(32,)),
# Add another:
layers.Dense(64, activation='relu'),
# Add a softmax layer with 10 output units:
layers.Dense(10, activation='softmax')])

或者采用keras的functional API:

inputs = keras.Input(shape=(784,), name='img')
x = layers.Dense(64, activation='relu')(inputs)
x = layers.Dense(64, activation='relu')(x)
outputs = layers.Dense(10, activation='softmax')(x)

model = keras.Model(inputs=inputs, outputs=outputs, name='mnist_model')

虽然都可以,但是我个人还是喜欢第一种那种模块化的模型构建方法。另外,你可以对call方法应用tf.function,这样模型执行就使用Graph模式了。

模型训练

在开始模型训练之前,一个重要的项是数据加载,TensorFlow 2.0的数据加载还是采用tf.data,不过在eager模式下,tf.data.Dataset这个类将成为一个Python迭代器,我们可以直接取值:

dataset = tf.data.Dataset.range(10)
for i, elem in enumerate(dataset):
    print(elem)  # prints 0, 1, ..., 9

这里我们只是展示了一个简单的例子,但是足以说明tf.data在TensorFlow 2.0下的变化,tf.data其它使用技巧和TensorFlow 1.x是一致的。

另外tf.keras提供两个重要的模块losses和metrics用于模型训练。对于losses,其本身就是对各种loss函数的封装,如下面的case:

bce = tf.keras.losses.BinaryCrossentropy()
loss = bce([0., 0., 1., 1.], [1., 1., 1., 0.])
print('Loss: ', loss.numpy())  # Loss: 11.522857

而metrics模块主要包含了常用的模型评估指标,这个模块与TensorFlow 1.x的metrics模块设计理念是一致的,就是metric本身是有状态的,一般是通过创建Variable来记录。基本用法如下:

m = tf.keras.metrics.Accuracy()
m.update_state([1, 2, 3, 4], [0, 2, 3, 4])
print('result: ', m.result().numpy())  # result: 0.75
m.update_state([0, 2, 3], [1, 2, 3])
print('result: ', m.result().numpy())  #  result: 0.714
m.reset_states()  # 重置
m.update_state([0, 2, 3], [1, 2, 3])
print('result: ', m.result().numpy())  #  result: 0.667

当你需要自定义metric时,你可以继承tf.keras.metrics.Metric类,然后实现一些接口即可,下面这个例子展示如何计算多分类问题中TP数量:

class CatgoricalTruePositives(tf.keras.metrics.Metric):

    def __init__(self, name='categorical_true_positives', **kwargs):
      super(CatgoricalTruePositives, self).__init__(name=name, **kwargs)
      self.true_positives = self.add_weight(name='tp', initializer='zeros')

    def update_state(self, y_true, y_pred, sample_weight=None):
      y_pred = tf.argmax(y_pred)
      values = tf.equal(tf.cast(y_true, 'int32'), tf.cast(y_pred, 'int32'))
      values = tf.cast(values, 'float32')
      if sample_weight is not None:
        sample_weight = tf.cast(sample_weight, 'float32')
        values = tf.multiply(values, sample_weight)
      self.true_positives.assign_add(tf.reduce_sum(values))

    def result(self):
      return self.true_positives

    def reset_states(self):
      # The state of the metric will be reset at the start of each epoch.
      self.true_positives.assign(0.)

上面的三个接口必须都要实现,其中update_state是通过添加新数据而更新状态,而reset_states是重置初始值,result方法是获得当前状态,即metric结果。注意这个metric其实是创建了一个Variable来保存TP值。你可以类比实现更复杂的metric。 对于模型训练,我们可以通过下面一个完整实例来全面学习:

import numpy as np
import tensorflow as tf

fashion_mnist = tf.keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# Adding a dimension to the array -> new shape == (28, 28, 1)
train_images = train_images[..., None]
test_images = test_images[..., None]

# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)

train_labels = train_labels.astype('int64')
test_labels = test_labels.astype('int64')

# dataset
train_ds = tf.data.Dataset.from_tensor_slices(
    (train_images, train_labels)).shuffle(10000).batch(32)
test_ds = tf.data.Dataset.from_tensor_slices(
    (test_images, test_labels)).batch(32)

# Model
class MyModel(tf.keras.Sequential):
    def __init__(self):
        super(MyModel, self).__init__([
          tf.keras.layers.Conv2D(32, 3, activation='relu'),
          tf.keras.layers.MaxPooling2D(),
          tf.keras.layers.Conv2D(64, 3, activation='relu'),
          tf.keras.layers.MaxPooling2D(),
          tf.keras.layers.Flatten(),
          tf.keras.layers.Dense(64, activation='relu'),
          tf.keras.layers.Dense(10, activation=None)
        ])

model = MyModel()

# optimizer
initial_learning_rate = 1e-4
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=100000,
    decay_rate=0.96,
    staircase=True)

optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr_schedule)

# checkpoint
checkpoint = tf.train.Checkpoint(step=tf.Variable(0), optimizer=optimizer, model=model)
manager = tf.train.CheckpointManager(checkpoint, './tf_ckpts', max_to_keep=3)

# loss function
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

# metric
train_loss_metric = tf.keras.metrics.Mean(name='train_loss')
train_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
test_loss_metric = tf.keras.metrics.Mean(name='test_loss')
test_acc_metric = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

# define a train step
@tf.function
def train_step(inputs, targets):
    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_object(targets, predictions)
        loss += sum(model.losses)  # add other losses
    # compute gradients and update variables
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    train_loss_metric(loss)
    train_acc_metric(targets, predictions)

# define a test step
@tf.function
def test_step(inputs, targets):
    predictions = model(inputs, training=False)
    loss = loss_object(targets, predictions)
    test_loss_metric(loss)
    test_acc_metric(targets, predictions)

# train loop
epochs = 10
for epoch in range(epochs):
    print('Start of epoch %d' % (epoch,))
    # Iterate over the batches of the dataset
    for step, (inputs, targets) in enumerate(train_ds):
        train_step(inputs, targets)
        checkpoint.step.assign_add(1)
        # log every 20 step
        if step % 20 == 0:
            manager.save() # save checkpoint
            print('Epoch: {}, Step: {}, Train Loss: {}, Train Accuracy: {}'.format(
                epoch, step, train_loss_metric.result().numpy(),
                train_acc_metric.result().numpy())
            )
            train_loss_metric.reset_states()
            train_acc_metric.reset_states()

# do test
for inputs, targets in test_ds:
    test_step(inputs, targets)
print('Test Loss: {}, Test Accuracy: {}'.format(
    test_loss_metric.result().numpy(),
    test_acc_metric.result().numpy()))

麻雀虽小,但五脏俱全,这个实例包括数据加载,模型创建,以及模型训练和测试。特别注意的是,这里我们将train和test的一个step通过tf.function转为Graph模式,可以加快训练速度,这是一种值得推荐的方式。另外一点,上面的训练方式采用的是custom training loops,自由度较高,另外一种训练方式是采用keras比较常规的compile和fit训练方式。

TensorFlow 2.0的另外一个特点是提供tf.distribute.Strategy更好地支持分布式训练,其接口更加简单易用。我们最常用的分布式策略是单机多卡同步训练,tf.distribute.MirroredStrategy完美支持这种策略。这种策略将在每个GPU设备上创建一个模型副本(replica),模型中的参数在所有replica之间映射,称之为MirroredVariables,当他们执行相同更新时将在所有设备间同步。底层的通信采用all-reduce算法,all-reduce方法可以将多个设备上的Tensors聚合在每个设备上,这种通信方式比较高效,而all-reduce算法有多中实现方式,这里默认采用NVIDIA NCCL的all-reduce方法。创建这种策略只需要简单地定义:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"],
    cross_device_ops=tf.distribute.NcclAllReduce())
# 这里将在GPU 0和1上同步训练

当我们创建好分布式策略后,在后续的操作中只需要加入strategy.scope即可。下面我们创建一个简单的模型以及优化器:

with mirrored_strategy.scope():
    model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.001)

对于dataset,我们需要调用tf.distribute.Strategy.experimental_distribute_dataset来分发数据:

with mirrored_strategy.scope():
    dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
      global_batch_size)
    # 注意这里是全局batch size
    dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

然后我们定义train step,并采用strategy.experimental_run_v2来执行:

@tf.function
def train_step(dist_inputs):
    def step_fn(inputs):
        features, labels = inputs

        with tf.GradientTape() as tape:
            logits = model(features)
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
            logits=logits, labels=labels)
            loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
        return cross_entropy

    per_example_losses = mirrored_strategy.experimental_run_v2(step_fn, args=(dist_inputs,))
    mean_loss = mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM,
                    per_example_losses, axis=0)
    return mean_loss

这里要注意的是我们要将loss除以全部batch size,只是因为分布式训练时在更新梯度前会将所有replica上梯度通过all-reduce算法相加聚合到每个设备上。另外,strategy.experimental_run_v2返回是每个replica的结果,要得到最终结果,需要reduce聚合一下。 最后是执行训练,采用循环方式即可:

with mirrored_strategy.scope():
    for inputs in dist_dataset:
        print(train_step(inputs))

要注意的是MirroredStrategy只支持单机多卡同步训练,如果想使用多机版本,需要采用MultiWorkerMirorredStrateg。其它的分布式训练策略还有CentralStorageStrategy,TPUStrategy,ParameterServerStrategy。想深入了解的话,可以查看distribute_strategy guide以及distribute_strategy tuorial

结语

这里我们简明扼要地介绍了TensorFlow 2.0的核心新特性,相信掌握这些新特性就可以快速入手TensorFlow 2.0。不过目前Google只发布了TensorFlow 2.0.0-beta0版本,未来也许会有更多想象不到的黑科技。加油!TensorFlow Coders。

参考文献

  1. TensorFlow官网.
  2. TensorFlow 2.0 docs.