Deep Q Network
Value-based Sequential Decision
Principle
- 抛弃Q表这种Q值记录方式,使用神经网络生成Q值,在状态较多的情况下格外有效率

2.
- Q估计:通过NN预测出的
的最大值
- Q现实:Q 估计中最大值的动作来换取环境中的奖励 reward+
下一步
中通过NN预测出的
的最大值
3. DQN两大利器:
- Experience replay: 作为一种离线学习,每次 DQN 更新的时候,我们都可以随机抽取一些之前的经历进行学习。随机抽取这种做法打乱了经历之间的相关性,也使得神经网络更新更有效率。
- Fixed Q-target: 在 DQN 中使用到两个结构相同但参数不同的神经网络, 预测 Q 估计的神经网络具备最新的参数, 而预测 Q 现实的神经网络使用的参数则是很久以前的.
4. 算法:

- 记忆库 (用于重复学习)
- 神经网络计算 Q 值
- 暂时冻结 q_target 参数 (切断相关性)
Implement
class Network(nn.Module):
def __init__(self, obs_space, act_space):
super(Network, self).__init__()
self.fc1 = nn.Linear(obs_space, 12)
self.relu = nn.ReLU()
self.out = nn.Linear(12, act_space)
def forward(self, s):
x = self.fc1(s)
x = self.relu(x)
Q_value = self.out(x)
return Q_value
class Skylark_DQN():
def __init__(self, env, alpha = 0.1, gamma = 0.6, epsilon=0.1, update_freq = 200):
self.obs_space = env.observation_space.n
self.act_space = env.action_space.n
self.eval_net = Network(self.obs_space, self.act_space)
self.target_net = Network(self.obs_space, self.act_space)
self.env = env
self.alpha = alpha # learning rate
self.gamma = gamma # discount rate
self.epsilon = epsilon # epsilon-greedy
self.buffer_size = 1000 # total size of replay/memory buffer
self.traj_count = 0 # count of trajectories in buffer
self.replay_buffer = np.zeros((self.buffer_size, self.obs_space*2+2))
self.total_step = 0
self.update_freq = update_freq # Freq of target update
def choose_action(self, state):
state = torch.unsqueeze(torch.FloatTensor(state), 0)
if np.random.uniform(0, 1) < self.epsilon:
action = self.env.action_space.sample() # Explore action space
else:
action = torch.argmax(self.eval_net(state)).numpy() # Exploit learned values
return action
def store_trajectory(self, s, a, r, s_):
traj = np.hstack((s,[a,r],s_))
index = self.traj_count % self.buffer_size # 记忆多了就覆盖旧数据
self.replay_buffer[index, :] = traj
self.traj_count += 1
def learn(self, batch_size=128):
if self.total_step % self.update_freq == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())
# 抽取记忆库中的批数据
sample_index = np.random.choice(self.buffer_size, batch_size)
b_memory = self.replay_buffer[sample_index, :]
b_s = torch.FloatTensor(b_memory[:, :self.obs_space])
b_a = torch.LongTensor(b_memory[:, self.obs_space:self.obs_space+1].astype(int))
b_r = torch.FloatTensor(b_memory[:, self.obs_space+1:self.obs_space+2])
b_s_ = torch.FloatTensor(b_memory[:, -self.obs_space:])
# 针对做过的动作b_a, 来选 q_eval 的值, (q_eval 原本有所有动作的值)
q_eval = self.eval_net(b_s).gather(1, b_a) # shape (batch, 1)
q_next = self.target_net(b_s_).detach() # q_next 不进行反向传递误差, 所以 detach
q_target = b_r + self.gamma * q_next.max(1)[0] # shape (batch, 1)
loss = nn.MSELoss()(q_eval, q_target)
# 计算, 更新 eval net
optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=self.alpha) # torch 的优化器
optimizer.zero_grad()
loss.backward()
optimizer.step()
def train(self, num_episodes, batch_size = 128, num_steps = 100):
for i in range(num_episodes):
state = self.env.reset()
steps, penalties, reward, sum_rew = 0, 0, 0, 0
done = False
while not done and steps < num_steps:
action = self.choose_action(state)
# Interaction with Env
next_state, reward, done, info = self.env.step(action)
self.store_trajectory(state, action, reward, next_state)
if self.traj_count > self.buffer_size:
self.learn(batch_size)
if reward == -10:
penalties += 1
sum_rew += reward
state = next_state
steps += 1
self.total_step += 1
print('Episode: {} | Avg_reward: {} | Length: {}'.format(i, sum_rew/steps, steps))
print("Training finished.")
评论(0)
您还未登录,请登录后发表或查看评论