DDPG(Deep Deterministic Policy Gradient)是Google DeepMind提出,该算法是基于Actor-Critic框架,同时又借鉴了DQN算法的思想,Policy网络和Q网络分别有两个神经网络,一个是Online神经网络,一个是Target神经网络。DDPG算法对PG算法,主要改进有:

(1)使用卷积神经网络来模拟策略函数和Q函数,并用深度学习的方法来训练,证明了在RL方法中,非线性模拟函数的准确性和高性能、可收敛;

(2)使用经验池,actor同环境交互时,产生的transition数据序列是在时间上高度关联(correlated)的,如果这些数据序列直接用于训练,会导致神经网络的overfit,不易收敛。

(3)target 网络和online 网络的使用, 使的学习过程更加稳定,收敛更有保障。

本博文用DDPG算法,进行一个旋转倒立摆的控制,其控制结果如下图所示:

最终得到的Reward的曲线,是趋于收敛的,倒立摆逐渐达到动态平衡,说明了控制的有效性。

算法框架

DDPG算法框架

算法流程

代码_有详细注释

#-*- coding: utf-8 -*-  

# 优化之前的 DDPG算法,更加轻量级 
# 之前存在不收敛问题,可能迭代的episode次数不够,或者在对Actor网络更新时,出现了一些问题

import tensorflow as tf 
import numpy as np 
import gym 
import time 
import matplotlib.pyplot as plt 

#---全局超参数--------------------------------------------------------------------

MAX_EPISODES = 400
MAX_TIME_STEPS = 200
LR_A = 0.001 # Actor学习率
LR_C = 0.001 # Critic学习率
GAMMA = 0.9 # 奖励衰减系数
TAU = 0.01 # 软更新soft update系数,一般取0.01~0.001
MEMORY_CAPACITY = 10000 # 经验池,存储10000条(si,ai,ri,si+1)后开始训练
BATCH_SIZE = 32 # 每次从经验池中随机抽取N=32条Transition

RENDER = False # gym可视化使能False 
ENV_NAME = 'Pendulum-v0' # 旋转倒立摆

#---DDPG模型---------------------------------------------------------------------

class DDPG_update(object):

	def __init__(self,a_dim,s_dim,a_bound,):

		self.memory = np.zeros((MEMORY_CAPACITY,s_dim*2+a_dim+1), dtype=np.float32) # (s_i,a_i,r_i,s_i+1)
		self.pointer = 0 
		self.sess = tf.Session()
		

		self.action_dim = a_dim # 动作维度
		self.state_dim = s_dim # 状态维度,gym内可以调用查看
		self.action_bound = a_bound # 动作范围值,gym可以知道

		self.S = tf.placeholder(tf.float32,[None,s_dim],'s')
		self.S_ = tf.placeholder(tf.float32,[None,s_dim],'s_') # 维度是[个数,S_维度]
		self.R = tf.placeholder(tf.float32,[None,1],'r') # reward

		with tf.variable_scope('ACTOR'):
			# Actor的Online网络,产生action动作
			self.a = self._build_a(s=self.S, scope='Actor_Online_Net', trainable=True)
			# Actor的Target网络,用来计算Critic更新的label yi的参数
			a_ = self._build_a(s=self.S_, scope='Actor_Target_Net', trainable=False)

		with tf.variable_scope('CRITIC'):
			# Critic的Online网络,用来产生Q(s,a)
			# self.a由Online Net输出
			Q = self._build_c(s=self.S, a=self.a, trainable=True, scope='Critic_Online_Net')
			# Critic的Target网络,用来产生计算Critic更新的label yi的参数
			Q_ = self._build_c(s=self.S_, a=a_, trainable=False, scope='Critic_Target_Net')

		# 网络参数
		self.action_online_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='ACTOR/Actor_Online_Net')
		self.action_target_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='ACTOR/Actor_Target_Net')
		self.critic_online_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='CRITIC/Critic_Online_Net')
		self.critic_target_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='CRITIC/Critic_Target_Net')

		# Soft Update更新
		self.soft_update = [tf.assign(t,(1-TAU)*t +TAU*e) \
					for t,e in zip(self.action_target_params+self.critic_target_params,\
								   self.action_online_params+self.critic_online_params)]

		# online critic 更新 
		# Q_target的值用来作为onlineQ学习的label值
		# 这是DQN算法的精髓
		Q_target = self.R + GAMMA*Q_ # Q_ = Q(s_,a_)
		Q_target_loss = tf.losses.mean_squared_error(labels=Q_target,predictions=Q) #MSE 最小化这个Loss
		self.ctrain = tf.train.AdamOptimizer(LR_C).minimize(Q_target_loss,var_list=self.critic_online_params)

		# online actor 更新
		# q的梯度*u梯度
		# Actor要朝着有可能获取最大Q的方向修改动作参数
		a_loss = -tf.reduce_mean(Q) # 最大化Q
		self.atrain = tf.train.AdamOptimizer(LR_A).minimize(a_loss,var_list=self.action_online_params)

		self.sess.run(tf.global_variables_initializer())

	def choose_action(self,s):
		# 运行Actor的Online网络,选择Action 
		return self.sess.run(self.a,{self.S: s[np.newaxis,:]})[0]
	
	def _build_a(self,s,scope,trainable):
		# 搭建actor网络,输入s, 输出a = u(s), u是用网络仿真策略u函数
		# 单隐层,直接输出action动作

		with tf.variable_scope(scope):
			net = tf.layers.dense(s,30,\
							activation=tf.nn.relu,\
							name='L1',
							trainable=trainable)
			action = tf.layers.dense(net,self.action_dim,\
							activation=tf.nn.tanh, #映射到0-1
							name = 'Action',
							trainable=trainable)

			# 将(0,1)的action映射到action_bound的范围
			scaled_action = tf.multiply(x=action, y=self.action_bound, name='scaled_a')
			return scaled_action

	def _build_c(self,s,a,trainable,scope):
		# 搭建Critic网络,输入s,a, 输出当前状态s下,使用a时的Q值
		# trainable = True if Reuse is None else False
		with tf.variable_scope(scope):
			Num_Layer1 = 30

			# 第一个 hidden layer
			# 单隐层权重占位符
			w1_s = tf.get_variable('W1_s',[self.state_dim,Num_Layer1],trainable=trainable)
			w1_a = tf.get_variable('W1_a',[self.action_dim,Num_Layer1],trainable=trainable)
			b1 = tf.get_variable('b1',[1,Num_Layer1],trainable=trainable)
			# 非线性化
			net = tf.nn.relu(tf.matmul(s,w1_s)+tf.matmul(a,w1_a)+b1)

			# 输出层,输出Q(s,a)
			Q = tf.layers.dense(net,1,trainable=trainable)
			return Q 

	def store_trainsition(self,s,a,r,s_): 
		# 将每个transition存入经验池中
		trainsition = np.hstack((s,a,[r],s_))
		index = self.pointer%MEMORY_CAPACITY # 指示经验池是否满,用新的memory取代旧的memory
		self.memory[index,:] = trainsition 
		self.pointer += 1 

	def learn(self):
		# 学习过程
		# ------soft update-------------------
		self.sess.run(self.soft_update)

		# ------从经验池中做Minibatch----------
		indices = np.random.choice(MEMORY_CAPACITY,size=BATCH_SIZE) # 随机选索引
		bt = self.memory[indices,:] # 随机选择

		# 确定A_online和Q_online网络输入
		# 主要是列表切分
		# 可以自己拿个简单列表分一下就OK
		bs = bt[:,:self.state_dim]
		ba = bt[:,self.state_dim:self.state_dim + self.action_dim]
		br = bt[:,-self.state_dim-1:-self.state_dim]
		bs_ = bt[:,-self.state_dim:]

		self.sess.run(self.atrain,feed_dict={self.S:bs})
		self.sess.run(self.ctrain,feed_dict={self.S:bs,self.a:ba,self.R:br,self.S_:bs_}) 

#---训练过程---------------------------------------------------------------------

if __name__ == '__main__':

	env = gym.make(ENV_NAME) #  选择倒立摆环境
	env = env.unwrapped 
	env.seed(1)

	s_dim = env.observation_space.shape[0] # 获得state维度
	a_dim = env.action_space.shape[0] # 获得action维度
	a_bound = env.action_space.high # 获得动作范围action_bound

	ddpg = DDPG_update(a_dim=a_dim, 
					   s_dim=s_dim, 
					   a_bound=a_bound)

	exploration_var = 3 # 控制探索度 前几次更新可以多进行探索,后面就少一点,因为越来越接近你的目标
	t1 = time.time() #开始时间
	reward_l = [] #  用于后面的plot 
	for i in range(MAX_EPISODES):
		s = env.reset()
		ep_reward = 0
		
		for j in range(MAX_TIME_STEPS): # 时间步

 			# 添加探索噪声,说不定会有更好的Q值
			a = ddpg.choose_action(s)
 			# 给action selection添加探索随机性
			a = np.clip(np.random.normal(a,exploration_var), a_min=-2, a_max=2)  #范围-2,2
 			
			s_,r,done,info = env.step(a) 

			ddpg.store_trainsition(s=s_, a=a, r=r/10, s_=s_)

			if ddpg.pointer > MEMORY_CAPACITY : 
 				exploration_var*=.9995 # 衰减你的探索者,到后面,就不需要在取探索了
 				ddpg.learn()
 			
			s = s_ 
			ep_reward+=r 

			if j == MAX_TIME_STEPS-1:
				print('EPISODE: num ',i, 'REWARD: %i' %int(ep_reward),'exploration_var: %.2f' %exploration_var,)
				# if ep_reward>-300: # 奖励大于-300,现象才比较明显,这时候就可以做可视化
 					# env.render()
				break 

		reward_l.append(ep_reward)

print('Runing Time:',time.time() - t1) 	

plt_episode = np.arange(0, MAX_EPISODES, 1)
plt.plot(plt_episode,reward_l,'>--',color='red',label='reward')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.legend()
plt.grid(True)
plt.show()