用PyTorch手把手实现DDPG算法,搞定OpenAI Gym连续控制任务(附完整代码)
用PyTorch手把手实现DDPG算法搞定OpenAI Gym连续控制任务深度确定性策略梯度DDPG作为强化学习领域的重要算法在机器人控制、自动驾驶等连续动作空间场景中展现出独特优势。本文将带您从零开始构建完整的DDPG实现通过PyTorch框架解决OpenAI Gym中的经典控制问题Pendulum-v0。不同于理论讲解我们聚焦工程实践中的关键细节提供可直接运行的代码方案。1. 环境配置与核心架构在开始编码前需要配置基础环境并理解DDPG的双网络架构。Pendulum-v0环境模拟倒立摆控制任务其状态空间包含摆角的正余弦值和角速度动作空间为连续扭矩值。import gym import torch import numpy as np env gym.make(Pendulum-v0) state_dim env.observation_space.shape[0] # 状态维度3 action_dim env.action_space.shape[0] # 动作维度1 action_bound env.action_space.high[0] # 动作范围[-2.0, 2.0]DDPG采用Actor-Critic架构包含四个神经网络在线Actor策略网络输入状态输出确定性动作目标Actor稳定训练的策略网络副本在线Critic价值网络评估状态-动作对的Q值目标Critic稳定训练的价值网络副本device torch.device(cuda if torch.cuda.is_available() else cpu) class Actor(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim64): super().__init__() self.fc1 nn.Linear(state_dim, hidden_dim) self.fc2 nn.Linear(hidden_dim, hidden_dim) self.fc3 nn.Linear(hidden_dim, action_dim) def forward(self, x): x F.relu(self.fc1(x)) x F.relu(self.fc2(x)) return torch.tanh(self.fc3(x)) * action_bound2. 经验回放与噪声探索DDPG通过经验回放机制打破数据相关性使用OU噪声实现有效探索。我们实现一个高效的回放缓冲区class ReplayBuffer: def __init__(self, capacity): self.buffer collections.deque(maxlencapacity) def add(self, state, action, reward, next_state, done): self.buffer.append((state, action, reward, next_state, done)) def sample(self, batch_size): transitions random.sample(self.buffer, batch_size) return zip(*transitions)对于连续动作空间的探索采用Ornstein-Uhlenbeck过程噪声class OUNoise: def __init__(self, action_dim, mu0, theta0.15, sigma0.2): self.action_dim action_dim self.mu mu self.theta theta self.sigma sigma self.reset() def reset(self): self.state np.ones(self.action_dim) * self.mu def sample(self): dx self.theta * (self.mu - self.state) dx self.sigma * np.random.randn(self.action_dim) self.state dx return self.state3. 网络训练与软更新机制DDPG的核心训练流程包含Critic的TD误差最小化和Actor的策略梯度上升def update(self, batch): states, actions, rewards, next_states, dones batch # Critic损失计算 next_actions self.target_actor(next_states) target_q self.target_critic(next_states, next_actions) target_q rewards (1 - dones) * self.gamma * target_q current_q self.critic(states, actions) critic_loss F.mse_loss(current_q, target_q.detach()) # Actor策略优化 actor_loss -self.critic(states, self.actor(states)).mean() # 网络参数更新 self.critic_optimizer.zero_grad() critic_loss.backward() self.critic_optimizer.step() self.actor_optimizer.zero_grad() actor_loss.backward() self.actor_optimizer.step() # 目标网络软更新 self.soft_update(self.actor, self.target_actor) self.soft_update(self.critic, self.target_critic)软更新通过参数混合实现稳定训练def soft_update(self, local_model, target_model): for target_param, local_param in zip(target_model.parameters(), local_model.parameters()): target_param.data.copy_(self.tau*local_param.data (1.0-self.tau)*target_param.data)4. 完整训练流程与性能优化将各模块整合为完整训练流程关键参数设置如下参数推荐值作用buffer_size100000经验回放容量batch_size64训练批大小gamma0.99折扣因子tau0.005软更新系数actor_lr1e-4Actor学习率critic_lr1e-3Critic学习率训练循环实现def train_agent(env, agent, episodes1000): returns [] for episode in range(episodes): state env.reset() episode_return 0 noise.reset() while True: action agent.select_action(state) next_state, reward, done, _ env.step(action) agent.replay_buffer.add(state, action, reward, next_state, done) if len(agent.replay_buffer) batch_size: agent.update() state next_state episode_return reward if done: break returns.append(episode_return) print(fEpisode {episode}: Return {episode_return:.1f}) return returns实际训练中常见问题与解决方案训练不稳定增大回放缓冲区容量降低学习率增加目标网络更新频率探索不足调整OU噪声参数初期采用更大噪声幅度逐步衰减噪声强度收敛速度慢优化网络结构增加层宽/深度尝试不同的激活函数调整批归一化策略5. 实战效果分析与调优建议在Pendulum-v0环境中典型的训练曲线呈现三个阶段探索期0-200回合回报波动大智能体随机尝试不同动作学习期200-600回合回报快速上升策略明显改善稳定期600回合回报趋于稳定策略接近最优通过修改网络结构和训练参数可进一步提升性能# 更深的网络结构 class DeepCritic(nn.Module): def __init__(self, state_dim, action_dim, hidden_dim256): super().__init__() self.fc1 nn.Linear(state_dim action_dim, hidden_dim) self.fc2 nn.Linear(hidden_dim, hidden_dim) self.fc3 nn.Linear(hidden_dim, hidden_dim) self.fc4 nn.Linear(hidden_dim, 1) def forward(self, x, a): x torch.cat([x, a], dim1) x F.relu(self.fc1(x)) x F.relu(self.fc2(x)) x F.relu(self.fc3(x)) return self.fc4(x)最终实现的DDPG算法能够在100-200个训练回合内稳定倒立摆平均回报达到-200以下原始环境定义倒立垂直向上为0向下为-1600。相比离散动作空间的DQNDDPG在连续控制任务中展现出三大优势动作精度高可输出连续扭矩值训练效率高不需要离散化动作空间策略更平滑确定性策略避免动作抖动实际部署时建议保存训练好的模型参数torch.save({ actor: actor.state_dict(), critic: critic.state_dict(), }, ddpg_model.pth)