深度强化学习专栏 —— 4. 使用ray做分布式计算

110
0
2021年2月18日 09时42分

目前训练强化学习智能体需要越来越多的数据和算力,分布式计算是加快训练过程的重要一环。Ray(RLlib)是由UC Berkeley’s RISE Lab在2017年发布,目前已经成为强化学习训练中使用最广泛的分布式框架以及性能最好的强化学习计算框架。下图是我在facebook发起的一个投票,虽然样本总量很少,但是ray(RLliib)得到了最高票。

 

在这里插入图片描述

 

我的计算机的CPU有12个核心,24个线程,所以在训练时我希望能完全的利用他们(与单纯的深度学习不同,训练强化学习智能体,CPU和GPU都很重要)。
下图是一个分布式计算的示例图。

在这里插入图片描述

对于off-policy的RL算法或者蒙特卡洛方法来说,如果能利用所有的核心,那么就可以24个线程同步采集训练数据,供优化算法进行优化,所以可以极大的提高训练的速度。

目前Ray仅支持Linux和MacOS,对python3.5-3.7支持较好。我们通过一个例子来看ray是怎样进行分布式计算的。在使用ray之前,需要使用

 

pip install ray[rllib]

 

命令进行安装。

 

import time
import numpy as np
import ray

def timer(x):
    time.sleep(1)
    return x

t0 = time.time()
values = [timer(x) for x in range(4)]
print('Time Elapsed:\t{:.4f}'.format(time.time() - t0))
print(values)

 

输出:

 

Time Elapsed:    4.0031
[0, 1, 2, 3]

 

这是符合预期的。
我们再来看一下使用Ray的情况。

 

import time
import numpy as np
import ray
#在使用IDE的情况下,不加ignore_reinit_error=True参数有可能导致错误
ray.init(ignore_reinit_error=True)
ray.available_resources()['CPU']

#ray的分布式计算是通过这个装饰符完成分配的
@ray.remote
def timer_ray(x):
    time.sleep(1)
    return x

t0 = time.time()
values = [timer_ray.remote(x) for x in range(4)]
print('Time Elapsed:\t{:.4f}'.format(time.time() - t0))
print(values)

 

上面的代码有两处值得注意的地方,已经在注释中写清楚。
输出:

 

Time Elapsed:    0.0036
[ObjectRef(35d7a3b94b8ab516ffffffff0100000001000000), 
ObjectRef(39c9e7f217500015ffffffff0100000001000000), 
ObjectRef(d072aa399048eb03ffffffff0100000001000000), 
ObjectRef(a1d795c28184037affffffff0100000001000000)]

 

简单想一下结果就会发现有问题。timer_ray函数至少会占用1s的时间,所以就算把range(4)的计算分配给4个线程来计算,也是会至少占用1s,为啥这里只用了0.0036s?另外values返回的为什么不是0,1,2,3,而是ObjectRef呢?

 

这是因为,返回的0.0036是代表Ray创建这几个Object所花的时间,并不是执行timer_ray这个函数的时间;上面代码这样写获取的values列表存储的是object id,而不是实际的timer_ray.remote(x)后的计算值。

 

那怎样获取实际的代码执行时间和实际的values值呢?
使用ray.get()函数。

 

import time
import numpy as np
import ray
#在使用IDE的情况下,不加ignore_reinit_error=True参数有可能导致错误
ray.init(ignore_reinit_error=True)
ray.available_resources()['CPU']

#ray的分布式计算是通过这个装饰符完成分配的
@ray.remote
def timer_ray(x):
    time.sleep(1)
    return x

t0 = time.time()
#这个地方用ray.get()获取
values = ray.get([timer_ray.remote(x) for x in range(4)])
print('Time Elapsed:\t{:.4f}'.format(time.time() - t0))
print(values)

 

输出:

 

Time Elapsed:    1.0113
[0, 1, 2, 3]

 

现在就是期望的输出了!Ray将4组运算分配到4个线程中去同时执行,所以相比不使用分布式计算的方式,时间是对方的1/4。这就是Ray的优势。

 

我们再来看一个滑动窗口求均值的例子:

 

import numpy as np
import time
import ray

data=np.random.normal(size=(1000,1000))
def calc_moving_average(data, window=10):
    ma_data = np.zeros(data.shape)
    for i, row in enumerate(data):
        ma_data[i] = np.array(
            [np.mean(row[j-window:j+1]) 
             if j > window else np.mean(row[:j+1]) 
             for j, _ in enumerate(row)])        
    return ma_data

t0 = time.time()
ma_data = calc_moving_average(data)
seq_time = time.time() - t0
print('Time Elapsed:\t{:.4f}'.format(seq_time))

from tqdm import tqdm
ray.shutdown()
ray.init(ignore_reinit_error=True)

@ray.remote
def calc_moving_average_ray(data, window=10):
    ma_data = np.zeros(data.shape)
    for i, row in enumerate(data):
        ma_data[i] = np.array(
            [np.mean(row[j-window:j+1]) 
             if j > window else np.mean(row[:j+1]) 
             for j, _ in enumerate(row)])        
    return ma_data
t0 = time.time()
ma_data_ray = ray.get(calc_moving_average_ray.remote(data))
par_time = time.time() - t0
print('Time Elapsed:\t{:.4f}'.format(par_time))
print('Speed up:\t{:.1f}X'.format(seq_time / par_time))
print("Results match:\t{}".format(np.allclose(ma_data, ma_data_ray)))

 

关于是怎样实现划窗求均值,大致读一下calc_moving_average函数的源码就能搞清楚,现在运行以下代码看一下使用和不使用ray的情况下花的时间。
输出:

 

Time Elapsed:    7.8588

Time Elapsed:    7.4783
Speed up:    1.1X
Results match:    True

 

我们看到即使使用了ray,计算速度并没有显著提升,这是为什么呢?
在说原因之前,我们看一下修改后的代码。我们将calc_moving_average_ray函数和函数调用分别做了修改。

 

@ray.remote
def calc_moving_average_ray(row, window=10):
    return np.array([np.mean(row[j-window:j+1]) 
             if j > window else np.mean(row[:j+1]) 
             for j, _ in enumerate(row)])
t0 = time.time()
ma_data_ray = np.array(ray.get(
    [calc_moving_average_ray.remote(row) 
    for row in data]
    ))

 

上面一个例子使用ray也没有起到加速的作用,这是为什么呢?
仔细看源码,是因为对一个二维数组来说,calc_moving_average_ray函数是先按行计算完成后,再逐一计算下面的列,所以这个函数的不满足分布式计算的需求。而修改后的函数做了什么呢?
划窗均值算法其实是对列独立的,也就是每个列的计算相互独立彼此间没有牵连,所以我们可以将每个列分出来单独计算,使用ray将每个列分布到不同的CPU核心上计算。
最后的输出结果为:

 

Time Elapsed:    0.7732
Speed up:    10.3X
Results match:    True

 

可以看到速度提升了10倍多。

这就是ray框架的诱人之处。

发表评论

后才能评论