1 分布式机器学习概述

大规模机器学习训练常面临计算量大、训练数据大(单机存不下)、模型规模大的问题,对此分布式机器学习是一个很好的解决方案。

1)对于计算量大的问题,分布式多机并行运算可以基本解决。不过需要与传统HPC中的共享内存式的多线程并行运算(如OpenMP)以及CPU-GPU计算架构做区分,这两种单机的计算模式我们一般称为计算并行

2)对于训练数据大的问题,需要将数据进行划分并分配到多个工作节点(Worker)上进行训练,这种技巧一般被称为数据并行。每个工作节点会根据局部数据训练出一个本地模型,并且会按照一定的规律和其他工作节点进行通信(通信内容主要是本地模型参数或者参数更新),以保证最终可以有效整合来自各个工作节点的训练结果并得到全局的机器学习模型。



如果是训练数据的样本量比较大,则需要对数据按照样本进行划分,我们称之为“数据样本划分”,按实现方法可分为“随机采样法”和“置乱切分法”。样本划分的合理性在于机器学习中的经验风险函数关于样本是可分的,我们将每个子集上的局部梯度平均,仍可得到整个经验风险函数的梯度。


如果训练数据的维度较高,还可对数据按照维度进行划分,我们称之为“数据维度划分”。它相较数据样本划分而言,与模型性质和优化方法的耦合度更高。如神经网络中各维度高度耦合,就难以采用此方式。不过,决策树对维度的处理相对独立可分,将数据按维度划分后,各节点可独立计算局部维度子集中具有最优信息增益的维度,然后进行汇总。此外,在线性模型中,模型参数与数据维度是一一对应的,故数据维度划分常与下面提到的模型并行相互结合。

3)对于模型规模大的问题,则需要对模型进行划分,并且分配到不同的工作节点上进行训练,这种技巧一般被称为模型并行。与数据并行不同,模型并行的框架下各个子模型之间的依赖关系非常强,因为某个子模型的输出可能是另外一个子模型的输入,如果不进行中间计算结果的通信,则无法完成整个模型训练。因此,一般而言,模型并行相比数据并行对通信的要求更高。


这里提一下数据样本划分中的几种划分方式。给定维样本和个工作节点,数据样本划分需要完成的任务是将个样本以某种形式分配到个工作节点上。

随机采样法中我们独立同分布地从个样本中有放回随机采样,每抽取一个样本将其分配到一个工作节点上。这个过程等价于先随机采个样本,然后均等地划分为份。

随机采样法便于理论分析,但基于实现难度的考虑,在工程中更多采用的是基于置乱切分的划分方法。即现将个样本随机置乱,再把数据均等地切分为份,再分配到个节点上进行训练。置乱切分相当于无放回采样,每个样本都会出现在某个工作节点上,每个工作节点的本地数据没有重复,故训练数据的信息量一般会更大。我们下面的划分方式都默认采取置乱切分的方法。

我们在后面的博客中会依次介绍针对数据并行和模型并行设计的各种分布式算法。本篇文章我们先看数据并行中最常用的同步并行SGD算法(也称SSGD)是如何在Spark平台上实现的。

2 同步并行SGD算法描述与实现

SSGD[1]对应的伪代码可以表述如下:

其中,SSGD算法每次依据来自个不同的工作节点上的样本的梯度来更新模型,设每个工作节点上的小批量大小为,则该算法等价于批量大小为
的小批量随机梯度下降法。

用PySpark对上述算法实现如下(因为不存在明显过拟合现象,我们令正则系数为0):

from sklearn.datasets import load_breast_cancer
import numpy as np
from pyspark.sql import SparkSession
from operator import add
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import sys
import os

os.environ['PYSPARK_PYTHON'] = sys.executable

n_threads = 4  # Number of local threads
n_iterations = 1500  # Number of iterations
eta = 0.1
mini_batch_fraction = 0.1 # the fraction of mini batch sample 
lam = 0 # coefficient of regular term

def logistic_f(x, w):
    return 1 / (np.exp(-x.dot(w)) + 1)


def gradient(point: np.ndarray, w: np.ndarray):
    """ Compute linear regression gradient for a matrix of data points
    """
    y = point[-1]    # point label
    x = point[:-1]   # point coordinate
    # For each point (x, y), compute gradient function, then sum these up
    return - (y - logistic_f(x, w)) * x


def reg_gradient(w, reg_type="l2", alpha=0):
    """ gradient for reg_term
    """ 
    assert(reg_type in ["none", "l2", "l1", "elastic_net"])
    if reg_type == "none":
        return 0
    elif reg_type == "l2":
        return w
    elif reg_type == "l1":
        return np.sign(w)
    else:
        return alpha * np.sign(w) + (1 - alpha) * w


def draw_acc_plot(accs, n_iterations):
    def ewma_smooth(accs, alpha=0.9):
        s_accs = np.zeros(n_iterations)
        for idx, acc in enumerate(accs):
            if idx == 0:
                s_accs[idx] = acc
            else:
                s_accs[idx] = alpha * s_accs[idx-1] + (1 - alpha) * acc
        return s_accs

    s_accs = ewma_smooth(accs, alpha=0.9)
    plt.plot(np.arange(1, n_iterations + 1), accs, color="C0", alpha=0.3)
    plt.plot(np.arange(1, n_iterations + 1), s_accs, color="C0")
    plt.title(label="Accuracy on test dataset")
    plt.xlabel("Round")
    plt.ylabel("Accuracy")
    plt.savefig("ssgd_acc_plot.png")


if __name__ == "__main__":

    X, y = load_breast_cancer(return_X_y=True)

    D = X.shape[1]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=0, shuffle=True)
    n_train, n_test = X_train.shape[0], X_test.shape[0]

    spark = SparkSession\
        .builder\
        .appName("SSGD")\
        .master("local[%d]" % n_threads)\
        .getOrCreate()

    matrix = np.concatenate(
        [X_train, np.ones((n_train, 1)), y_train.reshape(-1, 1)], axis=1)

    points = spark.sparkContext.parallelize(matrix).cache()

    # Initialize w to a random value
    w = 2 * np.random.ranf(size=D + 1) - 1
    print("Initial w: " + str(w))

    accs = []
    for t in range(n_iterations):
        print("On iteration %d" % (t + 1))
        w_br = spark.sparkContext.broadcast(w)
        
        (g, mini_batch_size) = points.sample(False, mini_batch_fraction, 42 + t)\
            .map(lambda point: gradient(point, w_br.value))\
            .treeAggregate(
                (0.0, 0),\
                    seqOp=lambda res, g: (res[0] + g, res[1] + 1),\
                        combOp=lambda res_1, res_2: (res_1[0] + res_2[0], res_1[1] + res_2[1])
            )

        w -= eta * (g/mini_batch_size + lam * reg_gradient(w, "l2"))
        
        y_pred = logistic_f(np.concatenate(
            [X_test, np.ones((n_test, 1))], axis=1), w)
        pred_label = np.where(y_pred < 0.5, 0, 1)
        acc = accuracy_score(y_test, pred_label)
        accs.append(acc)
        print("iterations: %d, accuracy: %f" % (t, acc))

    print("Final w: %s " % w)
    print("Final acc: %f" % acc)

    spark.stop()

    draw_acc_plot(accs, n_iterations)

完整代码我已经上传到了GitHub仓库[Distributed-Algorithm-PySpark](包括其它分布式机器学习算法),感兴趣的童鞋可以前往查看。

我们尝试以
=0
的正则系数运行。初始权重如下:

Initial w: [-0.36008689 -0.56946672 -0.111438   -0.69218272  0.14209017 -0.03984532
  0.94087485  0.04861045 -0.61083642 -0.64141307  0.94030259  0.30646586
  0.02675997  0.11238215 -0.86864834 -0.47662071 -0.0493882  -0.73569782
 -0.92104136  0.91834097 -0.05942094  0.06995983 -0.3099812  -0.87308266
  0.55421184  0.02251628  0.83765882 -0.69977596  0.02591093 -0.28081701
 -0.21881969]

最终的模型权重与在测试集上的准确率结果如下:

Final w: [ 3.58216967e+01  4.53599397e+01  2.07040135e+02  8.52414269e+01
  4.33038042e-01 -2.93986236e-01  1.43286366e-01 -2.95961229e-01
 -7.63362321e-02 -3.93180625e-01  8.19325971e-01  3.30881477e+00
 -3.25867503e+00 -1.24769634e+02 -8.52691792e-01 -5.18037887e-01
 -1.34380402e-01 -7.49316038e-01 -8.76722455e-01  9.23748261e-01
  3.81531205e+01  5.56880612e+01  2.04895002e+02 -1.17586430e+02
  8.92355523e-01 -9.40611324e-01 -9.24082612e-01 -1.16210791e+00
  7.10117706e-01 -7.62921434e-02  4.48389687e+00] 
Final acc: 0.929825

回忆一下我们在博客《分布式机器学习:逻辑回归的并行化实现(PySpark) 》中在同样的超参数下,测得同步并行梯度下降(非随机)学到的模型的精度为0.941520,可见SSGD引入的随机梯度对算法收敛有一定影响。具体的算法收敛性分析我们在第3部分中分析。

代码中有两个关键点,一个是points.sample(False, mini_batch_fraction, 42 + t)。函数sample负责返回当前RDD的一个随机采样子集(包含所有分区),其原型为:

RDD.sample(withReplacement: bool, fraction: float, seed: Optional[int] = None) → pyspark.rdd.RDD[T]

参数withReplacement的值为True表示采样是有放回(with Replacement, 即replace when sampled out),为False则表示无放回 (without Replacement)。如果是有放回,参数fraction表示每个样本的期望被采次数,fraction必须要满足

0;如果是无放回,参数fraction表示每个样本被采的概率,fraction必须要满足位于[0,1]区间内。

还有一个关键点是

.treeAggregate(
                (0.0, 0),\
                    seqOp=lambda res, g: (res[0] + g, res[1] + 1),\
                        combOp=lambda res_1, res_2: (res_1[0] + res_2[0], res_1[1] + res_2[1])
            )

该函数负责对RDD中的元素进行树形聚合,它在数据量很大时比reduce更高效。该函数的原型为

RDD.treeAggregate(zeroValue: U, seqOp: Callable[[U, T], U], combOp: Callable[[U, U], U], depth: int = 2) → U[source]

其中zeroValue为聚合结果的初始值,seqOp函数用于定义单分区(partition)做聚合操作的方法,该方法第一个参数为聚合结果,第二个参数为分区中的数据变量,返回更新后的聚合结果。combOp定义对分区之间做聚合的方法,该方法第一个参数为第二个参数都为聚合结果,返回累加后的聚合结果。depth为聚合树的深度。

我们这里treeAggregate想要聚合得到一个元组(g, mini_batch_size)g为所有节点样本的随机梯度和,mini_batch_size为所有节点所采的小批量样本之和,故我们将聚合结果的初始值zeroVlue初始化为(0,0, 0)。具体的聚合过程描述如下:




    1. 对每个partition:

      a. 初始化聚合结果为(0.0, 0)

      b. 对当前partition的序列元素,依次执行聚合操作seqOp

      c. 得到当前partition的聚合结果(partition_sum, partition_count)


    1. 对所有partition:

      a. 按照树行模式合并各partition的聚合结果,合并方法为combOp

      b. 得到合并结果(total_sum, total_count)



形象化地表示该聚合过程如下图所示:

3 算法收敛性及复杂度分析

3.1 收敛性和计算复杂度

该算法最终得到的ACC曲线如下图所示:

我们将其与在同样超参数设置下的同步梯度下降法(非随机)的ACC曲线对比(如下是同步梯度下降法的ACC曲线):

可以看到SSGD的收敛速率相同步并行梯度下降有所降低。正如我们在博客《数值优化:经典随机优化算法及其收敛性与复杂度分析》中所说的,虽然随机梯度是是全梯度的无偏估计,但这种估计存在一定的方差,会引入不确定性,导致最终算法的收敛速率下降。

参考



    • [1] Zinkevich M, Weimer M, Li L, et al. Parallelized stochastic gradient descent[J]. Advances in neural information processing systems, 2010, 23.

    • [4] 刘浩洋,户将等. 最优化:建模、算法与理论[M]. 高教出版社, 2020.

    • [5] 刘铁岩,陈薇等. 分布式机器学习:算法、理论与实践[M]. 机械工业出版社, 2018.




 

__EOF__

  • 本文作者: 猎户座
  • 本文链接: https://www.cnblogs.com/orion-orion/p/16413182.html
  • 关于博主: 本科CS系蒟蒻,机器学习半吊子,并行计算混子。
  • 版权声明: 欢迎您对我的文章进行转载,但请务必保留原始出处哦(_^▽^*)。
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐】一下。