概述

Deep Q-Learning(深度 Q 学习)是一种强化学习算法,用于解决决策问题,其中代理(agent)通过学习在不同环境中采取行动来最大化累积奖励。Lunar Lander 是一个经典的强化学习问题,其中代理的任务是控制一个着陆舱在月球表面着陆,最小化着陆过程中的燃料消耗。

以下是使用 Deep Q-Learning 解决 Lunar Lander 问题的基本步骤:

环境建模: 首先,需要对 Lunar Lander 环境进行建模。这包括定义状态空间、动作空间、奖励函数等。在 Lunar Lander 中,状态可以包括着陆舱的位置、速度、角度等信息,动作可以是推力引擎的火力等。深度 Q 网络: 创建一个深度神经网络,该网络将输入状态,并输出每个可能动作的 Q 值。Q 值表示在给定状态下采取某个动作的累积奖励的估计。网络的目标是通过学习调整 Q 值以最大化累积奖励。经验回放: 使用经验回放机制来改善学习稳定性。这涉及到存储代理先前的经验,并从中随机抽样进行训练。这有助于解决样本相关性问题,提高算法的收敛性。ε-贪婪策略: 引入 ε-贪婪策略,以平衡探索和利用。在一部分情况下,代理将以高概率选择当前认为最佳的动作(贪婪),而在其他情况下,它会以较高概率选择一个随机动作(探索)。目标 Q 值计算: 使用目标 Q 值来更新网络参数。目标 Q 值通过将当前状态的即时奖励与下一个状态的最大 Q 值相结合得到。这有助于更有效地传播奖励信号。训练: 通过与环境的交互,不断地更新深度 Q 网络的参数。代理通过学习来优化其行为,以最大化预期的累积奖励。调优: 对算法的超参数进行调优,包括学习率、折扣因子、神经网络结构等。这有助于提高算法的性能和稳定性。

使用 Deep Q-Learning 解决 Lunar Lander 问题是一个复杂的任务,需要仔细调整和实验。算法的性能可能受到许多因素的影响,包括网络结构的选择、超参数的设置以及环境的建模等。

Lunar Lander环境

这个环境是 Box2D 环境的一部分,其中包含有关环境的一般信息。

描述 Description

该环境是一个经典的火箭轨迹优化问题。根据庞特里亚金最大值原理,最佳的方法是全速点火或关闭引擎。这就是为什么这个环境具有离散动作的原因:引擎开启或关闭。

有两个环境版本:离散或连续。着陆点始终位于坐标 (0,0)。坐标是状态向量中的前两个数字。在着陆点外着陆是可能的。燃料是无限的,因此代理可以学会飞行,然后在第一次尝试时着陆。

要查看启发式着陆,请运行:

python gymnasium/envs/box2d/lunar_lander.py

动作空间 Action Space

有四个可用的离散动作:

0:什么都不做

1:点火左定向引擎

2:点火主引擎

3:点火右定向引擎

观察空间 Observation Space

状态是一个8维向量:着陆器在 x 和 y 上的坐标,它在 x 和 y 上的线速度,它的角度,它的角速度,以及两个表示每条腿是否与地面接触的布尔值。

奖励 Rewards

每一步都会获得一个奖励。一个 episode(回合)总奖励是该 episode 中所有步骤的奖励之和。

对于每一步,奖励:

着陆器离着陆点越近/远,奖励增加/减少。

着陆器移动越慢/快,奖励增加/减少。

着陆器倾斜度越大,奖励减少(角度非水平)。

每个与地面接触的腿奖励增加10分。

每帧侧引擎点火,奖励减少0.03分。

每帧主引擎点火,奖励减少0.3分。

episode 因坠毁或安全着陆而额外获得-100或+100分的奖励。

如果一个 episode 得分至少为200分,则认为它是一个解决方案。

起始状态 Starting State

着陆器位于视口的顶部中心,对其质心施加随机初始力。

回合终止条件 Episode Termination

如果满足以下条件回合结束:

着陆器坠毁(着陆器本体与月球接触);

着陆器超出视口范围(x 坐标大于1);

着陆器处于非唤醒状态。根据 Box2D 文档,处于非唤醒状态的身体是不移动且不与任何其他身体碰撞的身体:

当 Box2D 确定一个身体(或一组身体)已经停止时,该身体进入了一种几乎没有 CPU 开销的休眠状态。如果一个身体醒着并与休眠中的身体发生碰撞,那么休眠中的身体会醒来。如果连接到它们的关节或接触被销毁,身体也会醒来。

参数 Arguments

要使用连续环境,您需要指定 continuous=True 参数,如下所示:

import gymnasium as gym

env = gym.make(

"LunarLander-v2",

continuous: bool = False,

gravity: float = -10.0,

enable_wind: bool = False,

wind_power: float = 15.0,

turbulence_power: float = 1.5,

)

如果传递 continuous=True,将使用连续动作(对应于引擎的油门),并且动作空间将是 Box(-1, +1, (2,), dtype=np.float32)。动作的第一个坐标确定主引擎的油门,而第二个坐标指定侧推器的油门。给定一个动作 np.array([main, lateral]),如果 main < 0,则主引擎将完全关闭,并且油门在 0 <= main <= 1 时按比例从 50% 缩放到 100%(特别是,主引擎在功率低于 50% 时不起作用)。同样,如果 -0.5 < lateral < 0.5,则侧推器将不会点火。如果 lateral < -0.5,则左推进器将点火,如果 lateral > 0.5,则右推进器将点火。同样,油门在 -1 到 -0.5 之间(以及 0.5 到 1 之间)按比例从 50% 缩放到 100%。

gravity 确定了重力常数,它被限制在 0 和 -12 之间。

如果传递 enable_wind=True,则着陆器将受到风的影响。风是使用函数 tanh(sin(2 k (t+C)) + sin(pi k (t+C))) 生成的。k 设置为 0.01。C 在 -9999 到 9999 之间随机抽样。

wind_power 确定了施加在飞行器上的线性风的最大幅度。wind_power 的推荐值在 0.0 到 20.0 之间。turbulence_power 确定了施加在飞行器上的旋转风的最大幅度。turbulence_power 的推荐值在 0.0 到 2.0 之间。

版本历史 Version History

v2:计算能量消耗,并在 v0.24 版本中,添加了具有风力和 turbulence_power 参数的湍流

v1:在状态向量中添加了与地面接触的腿;与地面接触会奖励 +10 分,如果失去接触则减去 -10 分;奖励重新调整为 200 分;更难的初始随机推力。

v0:初始版本

备注 Notes

在环境实现中存在一些意外的错误。

着陆器身体上侧推器的位置会随着着陆器的方向而变化。这反过来导致对着陆器施加方向依赖性扭矩。

状态的单位不一致。即:

角速度以每秒 0.4 弧度为单位。为了转换为每秒弧度,需要将该值乘以 2.5 的因子。

对于 VIEWPORT_W、VIEWPORT_H、SCALE 和 FPS 的默认值,比例因子相等:'x': 10 'y': 6.666 'vx': 5 'vy': 7.5 'angle': 1 'angular velocity': 2.5

在进行更正后,状态的单位如下:'x':(单位) 'y':(单位) 'vx':(单位/秒) 'vy':(单位/秒) 'angle':(弧度) 'angular velocity':(弧度/秒)

示例代码

################ 完整代码

import gymnasium as gym

from gym.wrappers.monitoring.video_recorder import VideoRecorder

from IPython.display import HTML, display

import imageio

import base64

import io

import glob

import os

import random

import numpy as np

import torch

import torch.nn as nn

import torch.optim as optim

import torch.nn.functional as F

import torch.autograd as autograd

from torch.autograd import Variable

from collections import deque, namedtuple

######## 创建网络架构

class Network(nn.Module):

def __init__(self, state_size, action_size, seed=42):

super(Network, self).__init__()

self.seed = torch.manual_seed(seed)

self.fc1 = nn.Linear(state_size, 64)

self.fc2 = nn.Linear(64, 64)

self.fc3 = nn.Linear(64, action_size)

def forward(self, state):

x = self.fc1(state)

x = F.relu(x)

x = self.fc2(x)

x = F.relu(x)

return self.fc3(x)

######## 设置环境:使用Gymnasium创建了LunarLander环境

state_shape = env.observation_space.shape

state_size = env.observation_space.shape[0]

number_actions = env.action_space.n

print('State shape: ', state_shape)

print('State size: ', state_size)

print('Number of actions: ', number_actions)

######## 初始化超参数:定义了学习率、批处理大小、折扣因子等超参数。

learning_rate = 5e-4

minibatch_size = 100

discount_factor = 0.99

replay_buffer_size = int(1e5)

interpolation_parameter = 1e-3

####### 实现经验回放:实现了经验回放(Experience Replay)的类 ReplayMemory,用于存储和采样Agent的经验

class ReplayMemory(object):

def __init__(self, capacity):

self.device = torch.device(

"cuda:0" if torch.cuda.is_available() else "cpu")

self.capacity = capacity

self.memory = []

def push(self, event):

self.memory.append(event)

if len(self.memory) > self.capacity:

del self.memory[0]

def sample(self, batch_size):

experiences = random.sample(self.memory, k=batch_size)

states = torch.from_numpy(np.vstack(

[e[0] for e in experiences if e is not None])).float().to(self.device)

actions = torch.from_numpy(

np.vstack([e[1] for e in experiences if e is not None])).long().to(self.device)

rewards = torch.from_numpy(np.vstack(

[e[2] for e in experiences if e is not None])).float().to(self.device)

next_states = torch.from_numpy(np.vstack(

[e[3] for e in experiences if e is not None])).float().to(self.device)

dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(

np.uint8)).float().to(self.device)

return states, next_states, actions, rewards, dones

########## 实现 DQN 代理:创建了一个Agent类,包含本地Q网络和目标Q网络,包含了采取动作、学习、软更新等方法。

class Agent():

# 初始化函数,参数为状态大小和动作大小

def __init__(self, state_size, action_size):

self.device = torch.device(

"cuda:0" if torch.cuda.is_available() else "cpu")

self.state_size = state_size

self.action_size = action_size

self.local_qnetwork = Network(state_size, action_size).to(self.device)

self.target_qnetwork = Network(state_size, action_size).to(self.device)

self.optimizer = optim.Adam(

self.local_qnetwork.parameters(), lr=learning_rate)

self.memory = ReplayMemory(replay_buffer_size)

self.t_step = 0

# 定义一个函数,用于存储经验并决定何时从中学习

def step(self, state, action, reward, next_state, done):

self.memory.push((state, action, reward, next_state, done))

self.t_step = (self.t_step + 1) % 4

if self.t_step == 0:

if len(self.memory.memory) > minibatch_size:

experiences = self.memory.sample(100)

self.learn(experiences, discount_factor)

# 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数

def act(self, state, epsilon=0.):

state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

self.local_qnetwork.eval()

with torch.no_grad():

action_values = self.local_qnetwork(state)

self.local_qnetwork.train()

if random.random() > epsilon:

return np.argmax(action_values.cpu().data.numpy())

else:

return random.choice(np.arange(self.action_size))

# 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子

def learn(self, experiences, discount_factor):

states, next_states, actions, rewards, dones = experiences

next_q_targets = self.target_qnetwork(

next_states).detach().max(1)[0].unsqueeze(1)

q_targets = rewards + discount_factor * next_q_targets * (1 - dones)

q_expected = self.local_qnetwork(states).gather(1, actions)

loss = F.mse_loss(q_expected, q_targets)

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step()

self.soft_update(self.local_qnetwork,

self.target_qnetwork, interpolation_parameter)

# 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数

def soft_update(self, local_model, target_model, interpolation_parameter):

for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):

target_param.data.copy_(interpolation_parameter * local_param.data + (

1.0 - interpolation_parameter) * target_param.data)

####### 训练DQN代理

agent = Agent(state_size, number_actions)

number_episodes = 2000

maximum_number_timesteps_per_episode = 1000

epsilon_starting_value = 1.0

epsilon_ending_value = 0.01

epsilon_decay_value = 0.995

epsilon = epsilon_starting_value

scores_on_100_episodes = deque(maxlen=100)

for episode in range(1, number_episodes + 1):

state, _ = env.reset()

score = 0

for t in range(maximum_number_timesteps_per_episode):

action = agent.act(state, epsilon)

next_state, reward, done, _, _ = env.step(action)

agent.step(state, action, reward, next_state, done)

state = next_state

score += reward

if done:

break

scores_on_100_episodes.append(score)

epsilon = max(epsilon_ending_value, epsilon_decay_value * epsilon)

print('\rEpisode {}\tAverage Score: {:.2f}'.format(

episode, np.mean(scores_on_100_episodes)), end="")

if episode % 100 == 0:

print('\rEpisode {}\tAverage Score: {:.2f}'.format(

episode, np.mean(scores_on_100_episodes)))

if np.mean(scores_on_100_episodes) >= 200.0:

print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(

episode - 100, np.mean(scores_on_100_episodes)))

torch.save(agent.local_qnetwork.state_dict(), 'checkpoint.pth')

break

####### 可视化结果

def show_video_of_model(agent, env_name):

env = gym.make(env_name, render_mode='rgb_array')

state, _ = env.reset()

done = False

frames = []

while not done:

frame = env.render()

frames.append(frame)

action = agent.act(state)

state, reward, done, _, _ = env.step(action.item())

env.close()

imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2')

def show_video():

mp4list = glob.glob('*.mp4')

if len(mp4list) > 0:

mp4 = mp4list[0]

video = io.open(mp4, 'r+b').read()

encoded = base64.b64encode(video)

display(HTML(data='''

loop controls style="height: 400px;">

'''.format(encoded.decode('ascii'))))

else:

print("Could not find video")

show_video()

终端输出:

State shape: (8,)

State size: 8

Number of actions: 4

Episode 100 Average Score: -174.79

Episode 200 Average Score: -102.59

Episode 300 Average Score: -68.797

Episode 400 Average Score: -38.18

Episode 500 Average Score: 24.301

Episode 600 Average Score: 149.24

Episode 700 Average Score: 134.89

Episode 800 Average Score: 185.41

Episode 826 Average Score: 200.92

Environment solved in 726 episodes! Average Score: 200.92

IMAGEIO FFMPEG_WRITER WARNING: input image is not divisible by macro_block_size=16, resizing from (600, 400) to (608, 400) to ensure video compatibility with most codecs and players. To prevent resizing, make your input image divisible by the macro_block_size or set the macro_block_size to 1 (risking incompatibility).

[swscaler @ 000001e276cb1f00] Warning: data is not aligned! This can lead to a speed loss

总结:以上代码是使用深度 Q 学习(DQN)算法训练一个智能体在月球着陆环境中控制火箭的示例。代码分为以下几个部分:

• 导入所需的库和模块,包括 gymnasium(一个开源的强化学习环境库),torch(一个开源的深度学习框架),以及一些辅助的库和模块,如 imageio(用于处理图像和视频),base64(用于编码和解码数据),deque(用于实现双端队列),namedtuple(用于创建命名元组)等。

import io # 导入io模块,用于读写文件

import glob # 导入glob模块,用于查找文件

import gymnasium as gym # 导入环境库

import os # os 是操作系统相关的库,用来处理文件和目录等。

import random # random 是随机数生成和操作的库,用来实现 epsilon 贪心策略等。

import numpy as np # numpy 是科学计算的库,用来处理多维数组和矩阵等。

import torch # torch 是 PyTorch 框架的主要库,用来实现张量和神经网络等。

import torch.nn as nn # torch.nn 是 PyTorch 框架的神经网络模块,用来定义神经网络的层和损失函数等。

import torch.optim as optim # torch.optim 是 PyTorch 框架的优化器模块,用来定义优化算法和更新参数等。

# torch.nn.functional 是 PyTorch 框架的函数式接口,用来实现激活函数和池化等。

import torch.nn.functional as F

# torch.autograd 是 PyTorch 框架的自动微分模块,用来实现反向传播和梯度计算等。

import torch.autograd as autograd

# torch.autograd.Variable 是 PyTorch 框架的变量类,用来封装张量和梯度等。

from torch.autograd import Variable

# collections 是 Python 的内置模块,用来实现特殊的容器类型,如双端队列和命名元组等。

from collections import deque, namedtuple

• 定义 Network 类,继承自 torch.nn.Module 类,用来构建神经网络模型,包括三个全连接层和一个前向传播函数,输入是状态的维度,输出是动作的个数。

class Network(nn.Module): # 继承nn模块中的Module类

def __init__(self, state_size, action_size, seed=42): # 初始化函数,参数为状态大小(8),动作大小(4),随机种子(42)

super(Network, self).__init__() # 调用父类的初始化函数

self.seed = torch.manual_seed(seed) # 设置随机种子

# 创建第一个全连接层,输入为状态大小,输出为64个神经元,这里64是为了适应月球着陆的问题

self.fc1 = nn.Linear(state_size, 64)

self.fc2 = nn.Linear(64, 64) # 创建第二个全连接层,输入为64个神经元,输出为64个神经元

self.fc3 = nn.Linear(64, action_size) # 创建第三个全连接层,输入为64个神经元,输出为动作大小

# 完成神经网络的构建

def forward(self, state): # 前向传播函数

x = self.fc1(state) # 从输入层接收状态

x = F.relu(x) # 使用激活函数

x = self.fc2(x) # 从第一个隐藏层接收输出

x = F.relu(x) # 使用激活函数

return self.fc3(x) # 返回最后一层的输出

• 创建月球着陆环境,使用 gym.make 函数,获取环境的状态空间和动作空间的属性,如状态的形状,状态的维度,动作的个数等。

# https://gymnasium.farama.org/environments/box2d/lunar_lander/

env = gym.make('LunarLander-v2')

# 导入月球着陆环境

state_shape = env.observation_space.shape # 状态的形状,这里是8维向量

state_size = env.observation_space.shape[0] # 状态的大小,这里是8个元素,包括坐标,速度等

number_actions = env.action_space.n # 动作的数量,这里是4个

print('State shape: ', state_shape)

print('State size: ', state_size)

print('Number of actions: ', number_actions)

• 定义一些超参数,如学习率,批次大小,折扣因子,回放缓冲区的容量,软更新的插值参数等。

learning_rate = 5e-4 # 学习率,这里是0.00005

minibatch_size = 100 # 批次大小,用于更新参数

discount_factor = 0.99 # 折扣因子,用于计算未来奖励,越接近1越考虑未来,越接近0越考虑当前

replay_buffer_size = int(1e5) # 重放缓冲区的大小,用于存储人工智能的经验,稳定和改善学习,这里是10万个经验

interpolation_parameter = 1e-3 # 插值参数,用于更新目标网络,这里是0.001

# 所有的参数都是通过实验得到的最优值

• 定义 ReplayMemory 类,用来实现经验回放机制,包括一个初始化函数,一个存储经验的函数,一个采样经验的函数,使用双端队列来存储经验元组,使用随机采样的方法来获取一批经验,将经验转换为 PyTorch 张量并发送到 cpu 或 gpu 上。

# 定义一个重放缓冲区的类

class ReplayMemory(object):

def __init__(self, capacity): # 初始化函数,参数为缓冲区的容量

# 判断是否有cuda可用,如果有则使用cuda,否则使用cpu

self.device = torch.device(

"cuda:0" if torch.cuda.is_available() else "cpu")

self.capacity = capacity # 设置缓冲区的容量

self.memory = [] # 创建一个列表,用于存储经验,每个经验包括状态,动作,奖励,下一个状态,是否结束等

def push(self, event): # 定义一个函数,用于向缓冲区中添加经验

self.memory.append(event) # 将经验添加到列表中

if len(self.memory) > self.capacity: # 如果列表的长度超过了容量

del self.memory[0] # 删除最旧的经验

def sample(self, batch_size): # 定义一个函数,用于从缓冲区中随机抽取一批经验

experiences = random.sample(self.memory, k=batch_size) # 从列表中随机抽取k个经验

# 从经验中提取每个元素,并将它们堆叠在一起

# 使用列表推导式,从每个经验中提取状态,使用np.vstack将它们垂直堆叠,然后转换为pytorch张量,使用.float()将它们转换为浮点数,使用.to(self.device)将它们发送到cpu或gpu

states = torch.from_numpy(np.vstack(

[e[0] for e in experiences if e is not None])).float().to(self.device)

actions = torch.from_numpy(np.vstack([e[1] for e in experiences if e is not None])).long(

).to(self.device) # 同理,从每个经验中提取动作,使用.long()将它们转换为整数

rewards = torch.from_numpy(np.vstack([e[2] for e in experiences if e is not None])).float(

).to(self.device) # 同理,从每个经验中提取奖励,使用.float()将它们转换为浮点数

next_states = torch.from_numpy(np.vstack([e[3] for e in experiences if e is not None])).float(

).to(self.device) # 同理,从每个经验中提取下一个状态,使用.float()将它们转换为浮点数

dones = torch.from_numpy(np.vstack([e[4] for e in experiences if e is not None]).astype(np.uint8)).float(

).to(self.device) # 同理,从每个经验中提取是否结束的标志,使用.astype(np.uint8)将它们转换为无符号整数,使用.float()将它们转换为浮点数

return states, next_states, actions, rewards, dones # 返回这些元素,注意顺序要一致

• 定义 Agent 类,用来实现深度 Q 学习的智能体,包括一个初始化函数,一个执行一步操作的函数,一个选择动作的函数,一个学习的函数,一个软更新的函数,使用两个神经网络模型,一个是本地 Q 网络,用来选择和评估动作,一个是目标 Q 网络,用来计算目标 Q 值,使用 Adam 优化器来更新本地 Q 网络的参数,使用 ReplayMemory 类的实例来存储和采样经验,使用时间步来控制何时学习,使用 epsilon 贪心策略来平衡探索和利用,使用均方误差损失函数来计算预期 Q 值和目标 Q 值之间的差异,使用软更新的方法来更新目标 Q 网络的参数。

# 定义一个代理类

class Agent():

def __init__(self, state_size, action_size): # 初始化函数,参数为状态大小和动作大小

# 判断是否有cuda可用,如果有则使用cuda,否则使用cpu

self.device = torch.device(

"cuda:0" if torch.cuda.is_available() else "cpu")

self.state_size = state_size # 创建对象变量,存储状态大小

self.action_size = action_size # 创建对象变量,存储动作大小

# Q学习 --

self.local_qnetwork = Network(state_size, action_size).to(

self.device) # 创建一个本地网络,用于选择动作,将其发送到设备上,随机种子已经在之前提供

self.target_qnetwork = Network(state_size, action_size).to(

self.device) # 创建一个目标网络,用于计算目标值,将其发送到设备上

# 创建一个优化器,用于更新本地网络的参数,使用Adam算法,学习率为之前定义的值

self.optimizer = optim.Adam(

self.local_qnetwork.parameters(), lr=learning_rate)

# 创建一个重放缓冲区,用于存储人工智能的经验,容量为之前定义的值

self.memory = ReplayMemory(replay_buffer_size)

self.t_step = 0 # 时间步数

def step(self, state, action, reward, next_state, done): # 定义一个函数,用于存储经验并决定何时从中学习

self.memory.push((state, action, reward, next_state, done)) # 将经验推入缓冲区

self.t_step = (self.t_step + 1) % 4 # 时间步数计数器(每4步学习一次)

if self.t_step == 0: # 如果时间步数为0

# 如果缓冲区中的经验数量大于批次大小,self.memory是重放缓冲区的实例,memory属性是__init__中定义的列表

if len(self.memory.memory) > minibatch_size:

experiences = self.memory.sample(100) # 从缓冲区中随机抽取100个经验

self.learn(experiences, discount_factor) # 从经验中学习,使用之前定义的折扣因子

def act(self, state, epsilon=0.): # 定义一个函数,根据给定的状态和epsilon值选择一个动作(epsilon贪婪动作选择策略)0.表示浮点数

# 将状态转换为pytorch张量,增加一个维度,表示批次的维度,0表示批次维度的索引,将其放在最前面,将张量发送到设备上

state = torch.from_numpy(state).float().unsqueeze(0).to(self.device)

self.local_qnetwork.eval() # 将本地网络设置为评估模式,不更新梯度,本地网络是代理类的属性,继承自nn.Module类,有eval()方法

with torch.no_grad(): # 不计算梯度,检查是否处于推理模式而不是训练模式

# 使用本地网络对状态进行前向传播,得到每个动作的价值,这些价值将被epsilon贪婪策略选择(这里我们得到的不是最终的值,而是对应于状态的q值)

action_values = self.local_qnetwork(state)

self.local_qnetwork.train() # 将本地网络设置为训练模式,更新梯度,本地网络是代理类的属性,继承自nn.Module类,有train()方法

# Eplison贪婪动作选择策略 --(用于探索和利用的平衡)

if random.random() > epsilon: # 如果随机数大于epsilon,random.random()是random库的方法,返回一个0到1之间的随机数

# 返回价值最大的动作,np.argmax是numpy库的方法,返回最大值的索引,action_values发送到cpu上,因为它是简单的,data.numpy()将格式转换为numpy的数据格式

return np.argmax(action_values.cpu().data.numpy())

else: # 否则

# 返回随机的动作,random.choice是random库的方法,从给定的列表中随机选择一个元素,np.arange是numpy库的方法,返回一个从0到动作大小的数组

return random.choice(np.arange(self.action_size))

def learn(self, experiences, discount_factor): # 定义一个函数,根据样本经验更新代理的q值,参数为经验和折扣因子

states, next_states, actions, rewards, dones = experiences # 从经验中解包元素

# 从目标网络中获取下一个状态的最大q值,self.target_qnetwork(next_states)返回每个动作的价值,.detach()将张量从计算图中分离,我们不会在反向传播中使用这些值,.max(1)沿着第一个维度取最大值,得到两个张量(最大值和索引),因此我们添加.max[1][0]只获取最大值,.unsqueeze(1)增加一个维度,表示批次的维度,但这次在第一个位置

next_q_targets = self.target_qnetwork(

next_states).detach().max(1)[0].unsqueeze(1) # 从目标Q网络的输出中,选择每个下一个状态对应的最大Q值,然后将其整理成一个列向量 next_q_targets。这个向量将用于计算Q-learning的目标值,以便更新本地Q网络

q_targets = rewards + discount_factor * next_q_targets * \

(1 - dones) # 计算当前状态的目标值,使用公式:奖励加上折扣后的未来最大值,乘以1减去结束标志

q_expected = self.local_qnetwork(states).gather(

1, actions) # 获取当前状态的预期值,.gather(1, actions)根据动作选择对应的价值

loss = F.mse_loss(q_expected, q_targets) # 计算预期值和目标值之间的均方误差

self.optimizer.zero_grad() # 清空优化器的梯度,zero_grad()是Adam的方法

loss.backward() # 反向传播误差

self.optimizer.step() # 更新本地网络的参数,step()是优化器的方法

# 使用软更新的方法更新目标网络的参数,参数为本地网络,目标网络和插值参数

self.soft_update(self.local_qnetwork,

self.target_qnetwork, interpolation_parameter)

# 定义一个函数,用于软更新目标网络的参数,参数为本地模型,目标模型和插值参数

def soft_update(self, local_model, target_model, interpolation_parameter):

# 遍历目标模型和本地模型的参数,zip()是一个函数,用于将参数打包成元组,parameters()是nn.Module的方法,返回模型的参数

for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):

# 使用插值参数更新目标模型的参数,将本地模型的参数乘以插值参数,再加上目标模型的参数乘以1减去插值参数,.copy_()是一个方法,用于复制张量的数据

target_param.data.copy_(interpolation_parameter * local_param.data + (

1.0 - interpolation_parameter) * target_param.data)

• 创建 Agent 类的实例,传入状态维度和动作个数。

agent = Agent(state_size, number_actions) # 创建一个代理或人工智能,参数为状态大小和动作数量

• 定义训练过程,包括回合数,每个回合的最大时间步数,epsilon 值的初始值,终止值,衰减值,记录最近 100 个回合的分数,遍历每个回合,重置环境,初始化分数,遍历每个时间步,选择动作,执行动作,存储经验,更新状态,累加奖励,判断是否结束,更新 epsilon 值,打印当前回合和平均分数,判断是否达到目标分数,保存模型参数。

# ### 训练DQN代理

# %%

number_episodes = 2000 # 我们想要训练的次数

# 我们不想让一个回合卡住,所以设置一个最大的时间步数(在月球上着陆的尝试最多为1000个时间步)

maximum_number_timesteps_per_episode = 1000

epsilon_starting_value = 1.0 # epsilon的初始值

epsilon_ending_value = 0.01 # epsilon的结束值

epsilon_decay_value = 0.995 # epsilon的衰减率,按照(1*0.995 , 1*0.995*0.995,...)的方式递减

epsilon = epsilon_starting_value # epsilon的变量

scores_on_100_episodes = deque(maxlen=100) # 最近100个回合的分数(列表) 创建了一个双端队列,最大长度为100

for episode in range(1, number_episodes + 1): # 循环直到2000次(上限固定)

state, _ = env.reset() # 重置环境(这里返回状态和观察值)

score = 0 # 每个回合的累积分数

for t in range(maximum_number_timesteps_per_episode): # 循环每个时间步

action = agent.act(state, epsilon) # 根据当前状态和epsilon贪婪策略选择一个动作

# 根据动作执行环境的步骤,返回下一个状态,奖励,是否结束等值,_是丢弃不需要的值

next_state, reward, done, _, _ = env.step(action)

agent.step(state, action, reward, next_state, done) # 调用代理的学习方法

state = next_state # 更新状态

score += reward # 累加奖励

if done: # 如果回合结束

break # 跳出循环

scores_on_100_episodes.append(score) # 将最近的回合分数添加到列表中

epsilon = max(epsilon_ending_value, epsilon_decay_value *

epsilon) # 更新epsilon的值,使其按照衰减率递减,但不低于结束值

print('\rEpisode {}\tAverage Score: {:.2f}'.format(episode, np.mean(

scores_on_100_episodes)), end="") # \r覆盖之前的输出,\t制表符,.2f保留两位小数,打印当前的回合数和平均分数,end表示不换行

if episode % 100 == 0: # 每100个回合

print('\rEpisode {}\tAverage Score: {:.2f}'.format(

episode, np.mean(scores_on_100_episodes))) # \r覆盖之前的输出,打印当前的回合数和平均分数,换行

if np.mean(scores_on_100_episodes) >= 200.0: # 如果达到胜利的条件

print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(

episode - 100, np.mean(scores_on_100_episodes))) # \n换行,:d表示整数,打印环境在多少个回合内解决,以及平均分数

torch.save(agent.local_qnetwork.state_dict(),

'checkpoint.pth') # 将当前Agent的本地Q网络(agent.local_qnetwork)的参数保存到文件中

break # 跳出循环

• 定义展示模型表现的函数,使用 imageio 库将每一帧的图像保存为视频文件,使用 HTML 标签和 base64 编码在网页中显示视频。

• 调用展示模型表现的函数,传入智能体和环境的名称。

from gym.wrappers.monitoring.video_recorder import VideoRecorder # 导入gym模块,用于录制视频

from IPython.display import HTML, display # 导入IPython模块,用于显示HTML和视频

import imageio # 导入imageio模块,用于处理图像

import base64 # 导入base64模块,用于编码和解码

import io # 导入io模块,用于读写文件

import glob # 导入glob模块,用于查找文件

import gymnasium as gym # 导入环境库

# %%

def show_video_of_model(agent, env_name): # 定义一个函数,用于展示代理在环境中的表现

env = gym.make(env_name, render_mode='rgb_array') # 创建一个环境,渲染模式为rgb数组

state, _ = env.reset() # 重置环境,获取初始状态

done = False # 设置结束标志为False

frames = [] # 创建一个列表,用于存储每一帧的图像

while not done: # 循环直到结束

frame = env.render() # 渲染环境,获取当前帧的图像

frames.append(frame) # 将图像添加到列表中

action = agent.act(state) # 根据当前状态选择一个动作

state, reward, done, _, _ = env.step(

action.item()) # 根据动作执行环境的步骤,获取下一个状态,奖励,是否结束等值

env.close() # 关闭环境

# 使用imageio模块,将列表中的图像保存为视频,帧率为30

imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, 'LunarLander-v2') # 调用函数,展示代理在月球着陆环境中的表现

def show_video(): # 定义一个函数,用于显示视频

mp4list = glob.glob('*.mp4') # 使用glob模块,查找当前目录下的所有mp4文件

if len(mp4list) > 0: # 如果找到了

mp4 = mp4list[0] # 取第一个文件

video = io.open(mp4, 'r+b').read() # 使用io模块,以二进制模式读取文件

encoded = base64.b64encode(video) # 使用base64模块,对文件进行编码

display(HTML(data='''

loop controls style="height: 400px;">

'''.format(encoded.decode('ascii')))) # 使用IPython模块,显示HTML格式的视频,使用base64编码的数据作为源

else: # 如果没有找到

print("Could not find video") # 打印提示信息

show_video() # 调用函数,显示视频

笔记

参考网址:

https://gymnasium.farama.org/environments/box2d/lunar_lander/

推荐链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: