写在前面:

在学习自动驾驶领域上的强化学习过程中,我决定使用highwy-env库建设的模拟器来进行环境构建,但是翻阅了众多教程(包含国内国外)之后,发现教程内容过旧,因为随着2023年的到来,highway-env库也进行了更新,前两年的教程无一例外都使用了老旧版本的函数和返回值。

highway-env是什么东西?

安装方式:(默认最新版)pip install highway-env

首先先列出我发现的新库中的改动:

以前返回值有四个:

        observation, reward, done, info = env.step(action)

现在返回值有五个:

        observation, reward, terminated, truncated, info = env.step(action)

我推测以前的环境数据形式是ndarray数组:

        data = env.reset()

        data = (arragry([[ndarray],[],[],...,[]]),type==dtype32)

现在的环境数据形式是元组:

        data = env.reset()

        data = ((arragry([[ndarray],[],[],...,[]]),type==dtype32,{reward:{},terminated:{},...,})

基于以上改动,那么在代码中的数据处理部分也会相应地发生改变。特别是在使用多个库的时候,需要注意版本关联问题。

参考的一些代码

我的虚拟环境配置:(GPU)

虚拟环境是什么东西?来人,喂它吃九转大肠。

其中必须用到的主要有以下几个:

基于 python 3.8.0

pytorch

gym

highway

tqdm

matplotlib

pygame

numpy

highway-env

使用DoubleDQN算法进行训练,此后还有在此基础上的其他改动。

默认创建python文件double_dqn.py,以下为文件中代码,拼在一起就是完整的。

注释是英文是因为我做的是英文的项目,简单翻译即可。

所使用的库

import os

import copy

import random

import time

import numpy as np

import matplotlib.pyplot as plt

from tqdm import tqdm

import torch

import torch.nn as nn

import torch.nn.functional as F

import gym

import highway_env

检测设备并初始化默认十字路口环境

# set device

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Author: Da Xuanzi 2023-2-17

# Define the environment

env = gym.make("intersection-v0")

# details

env.config["duration"] = 13

env.config["vehicles_count"] = 20

env.config["vehicles_density"] = 1.3

env.config["reward_speed_range"] = [7.0, 10.0]

env.config["initial_vehicle_count"] = 10

env.config["simulation_frequency"] = 15

env.config["arrived_reward"] = 2

env.reset()

十字路口环境的结构:

env.config

{

"observation": {

"type": "Kinematics",

"vehicles_count": 15,

"features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],

"features_range": {

"x": [-100, 100],

"y": [-100, 100],

"vx": [-20, 20],

"vy": [-20, 20],

},

"absolute": True,

"flatten": False,

"observe_intentions": False

},

"action": {

"type": "DiscreteMetaAction",

"longitudinal": False,

"lateral": True

},

"duration": 13, # [s]

"destination": "o1",

"initial_vehicle_count": 10,

"spawn_probability": 0.6,

"screen_width": 600,

"screen_height": 600,

"centering_position": [0.5, 0.6],

"scaling": 5.5 * 1.3,

"collision_reward": IntersectionEnv.COLLISION_REWARD,

"normalize_reward": False

}

构建网络

可以自定义隐藏层节点个数

class Net(nn.Module):

def __init__(self, state_dim, action_dim):

# super class

super(Net, self).__init__()

# hidden nodes define

hidden_nodes1 = 1024

hidden_nodes2 = 512

self.fc1 = nn.Linear(state_dim, hidden_nodes1)

self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)

self.fc3 = nn.Linear(hidden_nodes2, action_dim)

def forward(self, state):

# define forward pass of the actor

x = state # state

# Relu function double

x = F.relu(self.fc1(x))

x = F.relu(self.fc2(x))

out = self.fc3(x)

return out

构建学习器

class Replay: # learning

def __init__(self,

buffer_size, init_length, state_dim, action_dim, env):

self.buffer_size = buffer_size

self.init_length = init_length

self.state_dim = state_dim

self.action_dim = action_dim

self.env = env

self._storage = []

self._init_buffer(init_length)

def _init_buffer(self, n):

# choose n samples state taken from random actions

state = self.env.reset()

for i in range(n):

action = self.env.action_space.sample()

observation, reward, done, truncated, info = self.env.step(action)

# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit

# observation: numpy array [location]

# reward: reward for *action

# terminated: bool whether end

# truncated: bool whether overflow (from done)

# info: help/log/information

if type(state) == type((1,)):

state = state[0]

# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item

# because after run env.reset(), the state stores the environmental data and it can not be edited

# we only need the state data -- the first ndarray

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self._storage.append(exp)

state = observation

if done:

state = self.env.reset()

done = False

def buffer_add(self, exp):

# exp buffer: {exp}=={

# "state": state,

# "action": action,

# "reward": reward,

# "state_next": observation,

# "done": terminated,}

self._storage.append(exp)

if len(self._storage) > self.buffer_size:

self._storage.pop(0) # remove the last one in dict

def buffer_sample(self, n):

# random n samples from exp buffer

return random.sample(self._storage, n)

构建学习对象

PATH = 你的文件夹绝对路径/相对路径

class DOUBLEDQN(nn.Module):

def __init__(

self,

env, # gym environment

state_dim, # state size

action_dim, # action size

lr = 0.001, # learning rate

gamma = 0.99, # discount factor

batch_size = 5, # batch size for each training

timestamp = "",):

# super class

super(DOUBLEDQN, self).__init__()

self.env = env

self.env.reset()

self.timestamp = timestamp

# for evaluation purpose

self.test_env = copy.deepcopy(env)

self.state_dim = state_dim

self.action_dim = action_dim

self.gamma = gamma

self.batch_size = batch_size

self.learn_step_counter = 0

self.is_rend = False

self.target_net = Net(self.state_dim, self.action_dim).to(device)#TODO

self.estimate_net = Net(self.state_dim, self.action_dim).to(device)#TODO

self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)#TODO

self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

def choose_our_action(self, state, epsilon = 0.9):

# greedy strategy for choosing action

# state: ndarray environment state

# epsilon: float in [0,1]

# return: action we chosen

# turn to 1D float tensor -> [[a1,a2,a3,...,an]]

# we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model

# ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]

if type(state) == type((1,)):

state = state[0]

temp = [exp for exp in state]

target = []

target = np.array(target)

# n dimension to 1 dimension ndarray

for i in temp:

target = np.append(target,i)

state = torch.FloatTensor(target).to(device)

# randn() return a set of samples which are Gaussian distribution

# no argments -> return a float number

if np.random.randn() <= epsilon:

# when random number smaller than epsilon: do these things

# put a state array into estimate net to obtain their value array

# choose max values in value array -> obtain action

action_value = self.estimate_net(state)

action = torch.argmax(action_value).item()

else:

# when random number bigger than epsilon: randomly choose a action

action = np.random.randint(0, self.action_dim)

return action

def train(self, num_episode):

# num_eposide: total turn number for train

loss_list = [] # loss set

avg_reward_list = [] # reward set

episode_reward = 0

rend = 0

# tqdm : a model for showing process bar

for episode in tqdm(range(1,int(num_episode)+1)):

done = False

state = self.env.reset()

each_loss = 0

step = 0

if type(state) == type((1,)):

state = state[0]

while not done:

if self.is_rend:

self.env.render()

step +=1

action = self.choose_our_action(state)

observation, reward, done, truncated, info = self.env.step(action)

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self.ReplayBuffer.buffer_add(exp)

state = observation

# sample random batch in replay memory

exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)

# extract batch data

action_batch = torch.LongTensor(

[exp["action"] for exp in exp_batch]

).to(device)

reward_batch = torch.FloatTensor(

[exp["reward"] for exp in exp_batch]

).to(device)

done_batch = torch.FloatTensor(

[1 - exp["done"] for exp in exp_batch]

).to(device)

# Slow method -> Fast method when having more data

state_next_temp = [exp["state_next"] for exp in exp_batch]

state_temp = [exp["state"] for exp in exp_batch]

state_temp_list = np.array(state_temp)

state_next_temp_list = np.array(state_next_temp)

state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)

state_batch = torch.FloatTensor(state_temp_list).to(device)

# reshape

state_batch = state_batch.reshape(self.batch_size, -1)

action_batch = action_batch.reshape(self.batch_size, -1)

reward_batch = reward_batch.reshape(self.batch_size, -1)

state_next_batch = state_next_batch.reshape(self.batch_size, -1)

done_batch = done_batch.reshape(self.batch_size, -1)

# obtain estimate Q value gather(dim, index) dim==1:column index

estimate_Q_value = self.estimate_net(state_batch).gather(1, action_batch)

# obtain target Q value detach:remove the matched element

max_action_index = self.estimate_net(state_next_batch).detach().argmax(1)

target_Q_value = reward_batch + done_batch * self.gamma * self.target_net(

state_next_batch

).gather(1, max_action_index.unsqueeze(1))# squeeze(1) n*1->1*1, unsqueeze(1) 1*1->n*1

# mse_loss: mean loss

loss = F.mse_loss(estimate_Q_value, target_Q_value)

each_loss += loss.item()

# update network

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step()

# update target network

# load parameters into model

if self.learn_step_counter % 10 == 0:

self.target_net.load_state_dict(self.estimate_net.state_dict())

self.learn_step_counter +=1

reward, count = self.eval()

episode_reward += reward

# you can update these variables

if episode_reward % 100 == 0:

rend += 1

if rend % 5 == 0:

self.is_rend = True

else:

self.is_rend = False

# save

period = 1

if episode % period == 0:

each_loss /= step

episode_reward /= period

avg_reward_list.append(episode_reward)

loss_list.append(each_loss)

print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(

episode, num_episode, each_loss, episode_reward, count

))

# episode_reward = 0

# create a new directory for saving

path = PATH + "/" + self.timestamp

try:

os.makedirs(path)

except OSError:

pass

# saving as timestamp file

np.save(path + "/DOUBLE_DQN_LOSS.npy", loss_list)

np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)

torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")

self.env.close()

return loss_list, avg_reward_list

def eval(self):

# evaluate the policy

count = 0

total_reward = 0

done = False

state = self.test_env.reset()

if type(state) == type((1,)):

state = state[0]

while not done:

action = self.choose_our_action(state, epsilon = 1)

observation, reward, done, truncated, info = self.test_env.step(action)

total_reward += reward

count += 1

state = observation

return total_reward, count

构建运行函数

超参数可以自己设置 lr gamma

if __name__ == "__main__":

# timestamp

named_tuple = time.localtime()

time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)

print(time_string)

# create a doubledqn object

double_dqn_object = DOUBLEDQN(

env,

state_dim=105,

action_dim=3,

lr=0.001,

gamma=0.99,

batch_size=64,

timestamp=time_string,

)

# your chosen train times

iteration = 20

# start training

avg_loss, avg_reward_list = double_dqn_object.train(iteration)

path = PATH + "/" + time_string

np.save(path + "/DOUBLE_DQN_LOSS.npy", avg_loss)

np.save(path + "/DOUBLE_DQN_EACH_REWARD.npy", avg_reward_list)

torch.save(double_dqn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_params.pkl")

torch.save(double_dqn_object.state_dict(), path + "/DOUBLE_DQN_MODEL.pt")

使用数据进行绘制图片

新建文件draw_figures.py

?处自己替换成自己的路径

import matplotlib.pyplot as plt

import numpy as np

Loss = r"?\?\DOUBLE_DQN_LOSS.npy"

Reward = r"?\?\DOUBLE_DQN_EACH_REWARD.npy"

avg_loss = np.load(Loss)

avg_reward_list = np.load(Reward)

# print("loss", avg_loss)

# print("reward", avg_reward_list)

plt.figure(figsize=(10, 6))

plt.plot(avg_loss)

plt.grid()

plt.title("Double DQN Loss")

plt.xlabel("epochs")

plt.ylabel("loss")

plt.savefig(r"?\figures\double_dqn_loss.png", dpi=150)

plt.show()

plt.figure(figsize=(10, 6))

plt.plot(avg_reward_list)

plt.grid()

plt.title("Double DQN Training Reward")

plt.xlabel("epochs")

plt.ylabel("reward")

plt.savefig(r"?\figures\double_dqn_train_reward.png", dpi=150)

plt.show()

提纳里手动分割线

Dueling_DQN

以上基本稍作改动即可

class Net(nn.Module):

def __init__(self, state_dim, action_dim):

"""

Initialize the network

: param state_dim: int, size of state space

: param action_dim: int, size of action space

"""

super(Net, self).__init__()

hidden_nodes1 = 1024

hidden_nodes2 = 512

self.fc1 = nn.Linear(state_dim, hidden_nodes1)

self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)

self.fc3 = nn.Linear(hidden_nodes2, action_dim + 1)

def forward(self, state):

"""

Define the forward pass of the actor

: param state: ndarray, the state of the environment

"""

x = state

# print(x.shape)

x = F.relu(self.fc1(x))

x = F.relu(self.fc2(x))

out = self.fc3(x)

return out

class Replay: # learning

def __init__(self,

buffer_size, init_length, state_dim, action_dim, env):

self.buffer_size = buffer_size

self.init_length = init_length

self.state_dim = state_dim

self.action_dim = action_dim

self.env = env

self._storage = []

self._init_buffer(init_length)

def _init_buffer(self, n):

# choose n samples state taken from random actions

state = self.env.reset()

for i in range(n):

action = self.env.action_space.sample()

observation, reward, done, truncated, info = self.env.step(action)

# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit

# observation: numpy array [location]

# reward: reward for *action

# terminated: bool whether end

# truncated: bool whether overflow (from done)

# info: help/log/information

if type(state) == type((1,)):

state = state[0]

# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item

# because after run env.reset(), the state stores the environmental data and it can not be edited

# we only need the state data -- the first ndarray

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self._storage.append(exp)

state = observation

if done:

state = self.env.reset()

done = False

def buffer_add(self, exp):

# exp buffer: {exp}=={

# "state": state,

# "action": action,

# "reward": reward,

# "state_next": observation,

# "done": terminated,}

self._storage.append(exp)

if len(self._storage) > self.buffer_size:

self._storage.pop(0) # remove the last one in dict

def buffer_sample(self, n):

# random n samples from exp buffer

return random.sample(self._storage, n)

class DUELDQN(nn.Module):

def __init__(

self,

env,

state_dim,

action_dim,

lr=0.001,

gamma=0.99,

batch_size=5,

timestamp="",

):

"""

: param env: object, a gym environment

: param state_dim: int, size of state space

: param action_dim: int, size of action space

: param lr: float, learning rate

: param gamma: float, discount factor

: param batch_size: int, batch size for training

"""

super(DUELDQN, self).__init__()

self.env = env

self.env.reset()

self.timestamp = timestamp

self.test_env = copy.deepcopy(env) # for evaluation purpose

self.state_dim = state_dim

self.action_dim = action_dim

self.gamma = gamma

self.batch_size = batch_size

self.learn_step_counter = 0

self.is_rend =False

self.target_net = Net(self.state_dim, self.action_dim).to(device)

self.estimate_net = Net(self.state_dim, self.action_dim).to(device)

self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

def choose_action(self, state, epsilon=0.9):

# greedy strategy for choosing action

# state: ndarray environment state

# epsilon: float in [0,1]

# return: action we chosen

# turn to 1D float tensor -> [[a1,a2,a3,...,an]]

# we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model

# ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]

if type(state) == type((1,)):

state = state[0]

temp = [exp for exp in state]

target = []

target = np.array(target)

# n dimension to 1 dimension ndarray

for i in temp:

target = np.append(target, i)

state = torch.FloatTensor(target).to(device)

# randn() return a set of samples which are Gaussian distribution

# no argments -> return a float number

if np.random.randn() <= epsilon:

# when random number smaller than epsilon: do these things

# put a state array into estimate net to obtain their value array

# choose max values in value array -> obtain action

action_value = self.estimate_net(state)

action_value = action_value[:-1]

action = torch.argmax(action_value).item()

else:

# when random number bigger than epsilon: randomly choose a action

action = np.random.randint(0, self.action_dim)

return action

def calculate_duelling_q_values(self, duelling_q_network_output):

"""

Calculate the Q values using the duelling network architecture. This is equation (9) in the paper.

:param duelling_q_network_output: tensor, output of duelling q network

:return: Q values

"""

state_value = duelling_q_network_output[:, -1]

avg_advantage = torch.mean(duelling_q_network_output[:, :-1], dim=1)

q_values = state_value.unsqueeze(1) + (

duelling_q_network_output[:, :-1] - avg_advantage.unsqueeze(1)

)

return q_values

def train(self, num_episode):

# num_eposide: total turn number for train

loss_list = [] # loss set

avg_reward_list = [] # reward set

episode_reward = 0

# tqdm : a model for showing process bar

for episode in tqdm(range(1,int(num_episode)+1)):

done = False

state = self.env.reset()

each_loss = 0

step = 0

if type(state) == type((1,)):

state = state[0]

while not done:

if self.is_rend:

self.env.render()

step += 1

action = self.choose_action(state)

observation, reward, done, truncated, info = self.env.step(action)

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self.ReplayBuffer.buffer_add(exp)

state = observation

# sample random batch in replay memory

exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)

# extract batch data

action_batch = torch.LongTensor(

[exp["action"] for exp in exp_batch]

).to(device)

reward_batch = torch.FloatTensor(

[exp["reward"] for exp in exp_batch]

).to(device)

done_batch = torch.FloatTensor(

[1 - exp["done"] for exp in exp_batch]

).to(device)

# Slow method -> Fast method when having more data

state_next_temp = [exp["state_next"] for exp in exp_batch]

state_temp = [exp["state"] for exp in exp_batch]

state_temp_list = np.array(state_temp)

state_next_temp_list = np.array(state_next_temp)

state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)

state_batch = torch.FloatTensor(state_temp_list).to(device)

# reshape

state_batch = state_batch.reshape(self.batch_size, -1)

action_batch = action_batch.reshape(self.batch_size, -1)

reward_batch = reward_batch.reshape(self.batch_size, -1)

state_next_batch = state_next_batch.reshape(self.batch_size, -1)

done_batch = done_batch.reshape(self.batch_size, -1)

# get estimate Q value

estimate_net_output = self.estimate_net(state_batch)

estimate_Q = self.calculate_duelling_q_values(estimate_net_output)

estimate_Q = estimate_Q.gather(1, action_batch)

# get target Q value

max_action_idx = (

self.estimate_net(state_next_batch)[:, :-1].detach().argmax(1)

)

target_net_output = self.target_net(state_next_batch)

target_Q = self.calculate_duelling_q_values(target_net_output).gather(

1, max_action_idx.unsqueeze(1)

)

target_Q = reward_batch + done_batch * self.gamma * target_Q

# compute mse loss

loss = F.mse_loss(estimate_Q, target_Q)

each_loss += loss.item()

# update network

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step()

# update target network

if self.learn_step_counter % 10 == 0:

self.target_net.load_state_dict(self.estimate_net.state_dict())

self.learn_step_counter += 1

reward, count = self.eval()

episode_reward += reward

# save

period = 1

if episode % period == 0:

each_loss /= step

episode_reward /= period

avg_reward_list.append(episode_reward)

loss_list.append(each_loss)

print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(

episode, num_episode, each_loss, episode_reward, count

))

# epoch_reward = 0

path = PATH + "/" + self.timestamp

# create a new directory for saving

try:

os.makedirs(path)

except OSError:

pass

np.save(path + "/DUELING_DQN_LOSS.npy", loss_list)

np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)

torch.save(self.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")

self.env.close()

return loss_list, avg_reward_list

def eval(self):

# evaluate the policy

count = 0

total_reward = 0

done = False

state = self.test_env.reset()

if type(state) == type((1,)):

state = state[0]

while not done:

action = self.choose_action(state, epsilon=1)

state_next, reward, done, _, info = self.test_env.step(action)

total_reward += reward

count += 1

state = state_next

return total_reward, count

if __name__ == "__main__":

# timestamp for saving

named_tuple = time.localtime() # get struct_time

time_string = time.strftime(

"%Y-%m-%d-%H-%M", named_tuple

) # have a folder of "date+time ex: 1209_20_36 -> December 12th, 20:36"

duel_dqn_object = DUELDQN(

env,

state_dim=105,

action_dim=3,

lr=0.001,

gamma=0.99,

batch_size=64,

timestamp=time_string,

)

path = PATH + "/" + time_string

# Train the policy

iterations = 10

avg_loss, avg_reward_list = duel_dqn_object.train(iterations)

np.save(path + "/DUELING_DQN_LOSS.npy", avg_loss)

np.save(path + "/DUELING_DQN_EACH_REWARD.npy", avg_reward_list)

torch.save(duel_dqn_object.estimate_net.state_dict(), path + "/DUELING_DQN_params.pkl")

torch.save(duel_dqn_object.state_dict(), path + "/DUELING_DQN_MODEL.pt")

DDQN+OtherChanges

三层2D卷积

# add CNN structure

class Net(nn.Module):

def __init__(self, state_dim, action_dim):

# initalize the network

# state_dim: state space

# action_dim: action space

super(Net, self).__init__()

# nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)

# in_channel : input size = in_channels * in_N * in_N

# out_channel : define

# kernel_size : rules or define

# stride: step length

# padding: padding size

# out_N = (in_N - Kernel_size + 2 * Padding)/ Stride +1

self.cnn = nn.Sequential(

# the first 2D convolutional layer

nn.Conv2d(1, 4, kernel_size=3, padding=1),

nn.BatchNorm2d(4),

nn.ReLU(inplace=True),

nn.MaxPool2d(kernel_size=3, stride=1),

# the second 2D convolutional layer

nn.Conv2d(4, 8, kernel_size=3, padding=1),

nn.BatchNorm2d(8),

nn.ReLU(inplace=True),

nn.MaxPool2d(kernel_size=3, stride=1),

# the third 2D convolutional layer ---- my test and try or more convolutional layers

nn.Conv2d(8, 4, kernel_size=3, padding=1),

nn.BatchNorm2d(4),

nn.ReLU(inplace=True),

nn.MaxPool2d(kernel_size=3, stride=1),

)

hidden_nodes1 = 1024

hidden_nodes2 = 512

self.fc1 = nn.Linear(4 * 1 * 9, hidden_nodes1)

self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)

self.fc3 = nn.Linear(hidden_nodes2, action_dim)

def forward(self, state):

# define forward pass of the actor

x = state # state

x = self.cnn(x)

x = x.view(x.size(0), -1)

# Relu function double

x = F.relu(self.fc1(x))

x = F.relu(self.fc2(x))

out = self.fc3(x)

return out

class Replay:

def __init__(self, buffer_size, init_length, state_dim, action_dim, env):

self.buffer_size = buffer_size

self.init_length = init_length

self.state_dim = state_dim

self.action_dim = action_dim

self.env = env

self._storage = []

self._init_buffer(init_length)

def _init_buffer(self, n):

# choose n samples state taken from random actions

state = self.env.reset()

for i in range(n):

action = self.env.action_space.sample()

observation, reward, done, truncated, info = self.env.step(action)

# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit

# observation: numpy array [location]

# reward: reward for *action

# terminated: bool whether end

# truncated: bool whether overflow (from done)

# info: help/log/information

if type(state) == type((1,)):

state = state[0]

# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item

# because after run env.reset(), the state stores the environmental data and it can not be edited

# we only need the state data -- the first ndarray

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self._storage.append(exp)

state = observation

if done:

state = self.env.reset()

done = False

def buffer_add(self, exp):

# exp buffer: {exp}=={

# "state": state,

# "action": action,

# "reward": reward,

# "state_next": observation,

# "done": terminated,}

self._storage.append(exp)

if len(self._storage) > self.buffer_size:

self._storage.pop(0) # remove the last one in dict

def buffer_sample(self, N):

# random n samples from exp buffer

return random.sample(self._storage, N)

class DOUBLEDQN_CNN(nn.Module):

def __init__(

self,

env, # gym environment

state_dim, # state size

action_dim, # action size

lr=0.001, # learning rate

gamma=0.99, # discount factor

batch_size=5, # batch size for each training

timestamp="", ):

# super class

super(DOUBLEDQN_CNN, self).__init__()

self.env = env

self.env.reset()

self.timestamp = timestamp

# for evaluation purpose

self.test_env = copy.deepcopy(env)

self.state_dim = state_dim

self.action_dim = action_dim

self.gamma = gamma

self.batch_size = batch_size

self.learn_step_counter = 0

self.is_rend = False

self.target_net = Net(self.state_dim, self.action_dim).to(device)

self.estimate_net = Net(self.state_dim, self.action_dim).to(device)

self.ReplayBuffer = Replay(1000, 100, self.state_dim, self.action_dim, env)

self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

def choose_action(self, state, epsilon=0.9):

# greedy strategy for choosing action

# state: ndarray environment state

# epsilon: float in [0,1]

# return: action we chosen

# turn to 1D float tensor -> [[a1,a2,a3,...,an]]

# we have to increase the speed of transformation ndarray to tensor if not it will spend a long time to train the model

# ndarray[[ndarray],...[ndarray]] => list[[ndarray],...[ndarray]] => ndarray[...] => tensor[...]

if type(state) == type((1,)):

state = state[0]

#TODO

state = (

torch.FloatTensor(state).to(device).reshape(-1, 1, 7, self.state_dim // 7)

)

if np.random.randn() <= epsilon:

action_value = self.estimate_net(state)

action = torch.argmax(action_value).item()

else:

action = np.random.randint(0, self.action_dim)

return action

def train(self, num_episode):

# num_eposide: total turn number for train

count_list = []

loss_list = []

total_reward_list = []

avg_reward_list = []

episode_reward = 0

rend = 0

for episode in tqdm(range(1,int(num_episode)+1)):

done = False

state = self.env.reset()

each_loss = 0

step = 0

if type(state) == type((1,)):

state = state[0]

while not done:

if self.is_rend:

self.env.render()

step += 1

action = self.choose_action(state)

observation, reward, done, truncated, info = self.env.step(action)

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self.ReplayBuffer.buffer_add(exp)

state = observation

# sample random batch from replay memory

exp_batch = self.ReplayBuffer.buffer_sample(self.batch_size)

# extract batch data

action_batch = torch.LongTensor([exp["action"] for exp in exp_batch])

reward_batch = torch.FloatTensor([exp["reward"] for exp in exp_batch])

done_batch = torch.FloatTensor([1 - exp["done"] for exp in exp_batch])

# Slow method -> Fast method when having more data

state_next_temp = [exp["state_next"] for exp in exp_batch]

state_temp = [exp["state"] for exp in exp_batch]

state_temp_list = np.array(state_temp)

state_next_temp_list = np.array(state_next_temp)

state_next_batch = torch.FloatTensor(state_next_temp_list)

state_batch = torch.FloatTensor(state_temp_list)

# reshape

state_batch = state_batch.to(device).reshape(

self.batch_size, 1, 7, self.state_dim // 7

)

action_batch = action_batch.to(device).reshape(self.batch_size, -1)

reward_batch = reward_batch.to(device).reshape(self.batch_size, -1)

state_next_batch = state_next_batch.to(device).reshape(

self.batch_size, 1, 7, self.state_dim // 7

)

done_batch = done_batch.to(device).reshape(self.batch_size, -1)

# get estimate Q value

estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

# get target Q value

max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)

target_Q = reward_batch + done_batch * self.gamma * self.target_net(

state_next_batch

).gather(1, max_action_idx.unsqueeze(1))

# compute mse loss

loss = F.mse_loss(estimate_Q, target_Q)

each_loss += loss.item()

# update network

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step()

# update target network

if self.learn_step_counter % 10 == 0:

self.target_net.load_state_dict(self.estimate_net.state_dict())

self.learn_step_counter += 1

reward, count = self.eval()

episode_reward += reward

# you can update these variables

if episode_reward % 100 == 0:

rend += 1

if rend % 5 == 0:

self.is_rend = True

else:

self.is_rend = False

# save

period = 1

if episode % period == 0:

each_loss /= step

episode_reward /= period

avg_reward_list.append(episode_reward)

loss_list.append(each_loss)

print("\nepisode:[{}/{}], \t each_loss: {:.4f}, \t eposide_reward: {:.3f}, \t step: {}".format(

episode, num_episode, each_loss, episode_reward, count

))

# epoch_reward = 0

# create a new directory for saving

path = PATH + "/" + self.timestamp

try:

os.makedirs(path)

except OSError:

pass

# saving as timestamp file

np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", loss_list)

np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)

torch.save(self.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")

self.env.close()

return loss_list, avg_reward_list

def eval(self):

# evaluate the policy

count = 0

total_reward = 0

done = False

state = self.test_env.reset()

if type(state) == type((1,)):

state = state[0]

while not done:

action = self.choose_action(state, epsilon=1)

observation, reward, done, truncated, info = self.test_env.step(action)

total_reward += reward

count += 1

state = observation

return total_reward, count

if __name__ == "__main__":

# timestamp

named_tuple = time.localtime()

time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)

print(time_string)

# create a doubledqn object

double_dqn_cnn_object = DOUBLEDQN_CNN(

env,

state_dim=105,

action_dim=3,

lr=0.001,

gamma=0.99,

batch_size=64,

timestamp=time_string,

)

# your chosen train times

iteration = 20

# start training

avg_loss, avg_reward_list = double_dqn_cnn_object.train(iteration)

path = PATH + "/" + time_string

np.save(path + "/DOUBLE_DQN_CNN_LOSS.npy", avg_loss)

np.save(path + "/DOUBLE_DQN_CNN_EACH_REWARD.npy", avg_reward_list)

torch.save(double_dqn_cnn_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_CNN_params.pkl")

torch.save(double_dqn_cnn_object.state_dict(), path + "/DOUBLE_DQN_CNN_MODEL.pt")

经验池

class Net(nn.Module):

def __init__(self, state_dim, action_dim):

# state_dim: state space

# action_dim: action space

super(Net, self).__init__()

hidden_nodes1 = 1024

hidden_nodes2 = 512

self.fc1 = nn.Linear(state_dim, hidden_nodes1)

self.fc2 = nn.Linear(hidden_nodes1, hidden_nodes2)

self.fc3 = nn.Linear(hidden_nodes2, action_dim)

def forward(self, state):

# state: ndarray

x = state

x = F.relu(self.fc1(x))

x = F.relu(self.fc2(x))

out = self.fc3(x)

return out

# Priortized_Replay

class Prioritized_Replay:

def __init__(

self,

buffer_size,

init_length,

state_dim,

action_dim,

est_Net,

tar_Net,

gamma,

):

# state_dim: state space

# action_dim: action space

# env: env

self.buffer_size = buffer_size

self.init_length = init_length

self.state_dim = state_dim

self.action_dim = action_dim

self.gamma = gamma

self.is_rend = False

self.priority = deque(maxlen=buffer_size)

self._storage = []

self._init_buffer(init_length, est_Net, tar_Net)

def _init_buffer(self, n, est_Net, tar_Net):

# n: sample number

state = env.reset()

for i in range(n):

action = env.action_space.sample()

observation, reward, done, truncated, info = env.step(action)

# gym.env.step(action): tuple (obversation, reward, terminated, truncated, info) can edit

# observation: numpy array [location]

# reward: reward for *action

# terminated: bool whether end

# truncated: bool whether overflow (from done)

# info: help/log/information

if type(state) == type((1,)):

state = state[0]

# if state is tuple (ndarray[[],[],...,[]],{"speed":Float,"cashed":Bool,"action":Int,"reward":dict,"agent-reward":Float[],"agent-done":Bool}),we take its first item

# because after run env.reset(), the state stores the environmental data and it can not be edited

# we only need the state data -- the first ndarray

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self.prioritize(est_Net, tar_Net, exp, alpha=0.6)

self._storage.append(exp)

state = observation

if done:

state = env.reset()

done = False

def buffer_add(self, exp):

# exp buffer: {exp}=={

# "state": state,

# "action": action,

# "reward": reward,

# "state_next": observation,

# "done": terminated,}

self._storage.append(exp)

if len(self._storage) > self.buffer_size:

self._storage.pop(0)

# add prioritize

def prioritize(self, est_Net, tar_Net, exp, alpha=0.6):

state = torch.FloatTensor(exp["state"]).to(device).reshape(-1)

q = est_Net(state)[exp["action"]].detach().cpu().numpy()

q_next = exp["reward"] + self.gamma * torch.max(est_Net(state).detach())

# TD error

p = (np.abs(q_next.cpu().numpy() - q) + (np.e ** -10)) ** alpha

self.priority.append(p.item())

def get_prioritized_batch(self, N):

prob = self.priority / np.sum(self.priority)

# random.choices(list,weights=None,*,cum_weights=None,k=1)

# weight: set the chosen item rate

# k: times for choice

# cum_weight: sum of weight

sample_idxes = random.choices(range(len(prob)), k=N, weights=prob)

importance = (1 / prob) * (1 / len(self.priority))

sampled_importance = np.array(importance)[sample_idxes]

sampled_batch = np.array(self._storage)[sample_idxes]

return sampled_batch.tolist(), sampled_importance

def buffer_sample(self, N):

# random n samples from exp buffer

return random.sample(self._storage, N)

class DDQNPB(nn.Module):

def __init__(

self,

env,

state_dim,

action_dim,

lr=0.001,

gamma=0.99,

buffer_size=1000,

batch_size=50,

beta=1,

beta_decay=0.995,

beta_min=0.01,

timestamp="",

):

# env: environment

# state_dim: state space

# action_dim: action space

# lr: learning rate

# gamma: loss/discount factor

# batch_size: training batch size

super(DDQNPB, self).__init__()

self.timestamp = timestamp

self.test_env = copy.deepcopy(env) # for evaluation purpose

self.state_dim = state_dim

self.action_dim = action_dim

self.gamma = gamma

self.batch_size = batch_size

self.learn_step_counter = 0

self.target_net = Net(self.state_dim, self.action_dim).to(device)

self.estimate_net = Net(self.state_dim, self.action_dim).to(device)

self.optimizer = torch.optim.Adam(self.estimate_net.parameters(), lr=lr)

self.ReplayBuffer = Prioritized_Replay(

buffer_size,

100,

self.state_dim,

self.action_dim,

self.estimate_net,

self.target_net,

gamma,

)

self.priority = self.ReplayBuffer.priority

# NOTE: right here beta is equal to (1-beta) in most of website articles, notation difference

# start from 1 and decay

self.beta = beta

self.beta_decay = beta_decay

self.beta_min = beta_min

def choose_action(self, state, epsilon=0.9):

# state: env state

# epsilon: [0,1]

# return action you choose

# get a 1D array

if type(state) == type((1,)):

state = state[0]

temp = [exp for exp in state]

target = []

target = np.array(target)

# n dimension to 1 dimension ndarray

for i in temp:

target = np.append(target, i)

state = torch.FloatTensor(target).to(device)

if np.random.randn() <= epsilon:

action_value = self.estimate_net(state)

action = torch.argmax(action_value).item()

else:

action = np.random.randint(0, self.action_dim)

return action

def train(self, num_episode):

# num_epochs: training times

loss_list = []

avg_reward_list = []

episode_reward = 0

for episode in tqdm(range(1,int(num_episode)+1)):

done = False

state = env.reset()

each_loss = 0

step = 0

rend = 0

if type(state) == type((1,)):

state = state[0]

while not done:

action = self.choose_action(state)

observation, reward, done, _, info = env.step(action)

# self.env.render()

# store experience to replay memory

exp = {

"state": state,

"action": action,

"reward": reward,

"state_next": observation,

"done": done,

}

self.ReplayBuffer.buffer_add(exp)

state = observation

# importance weighting

if self.beta > self.beta_min:

self.beta *= self.beta_decay

# sample random batch from replay memory

exp_batch, importance = self.ReplayBuffer.get_prioritized_batch(

self.batch_size

)

importance = torch.FloatTensor(importance ** (1 - self.beta)).to(device)

# extract batch data

action_batch = torch.LongTensor(

[exp["action"] for exp in exp_batch]

).to(device)

reward_batch = torch.FloatTensor(

[exp["reward"] for exp in exp_batch]

).to(device)

done_batch = torch.FloatTensor(

[1 - exp["done"] for exp in exp_batch]

).to(device)

# Slow method -> Fast method when having more data

state_next_temp = [exp["state_next"] for exp in exp_batch]

state_temp = [exp["state"] for exp in exp_batch]

state_temp_list = np.array(state_temp)

state_next_temp_list = np.array(state_next_temp)

state_next_batch = torch.FloatTensor(state_next_temp_list).to(device)

state_batch = torch.FloatTensor(state_temp_list).to(device)

# reshape

state_batch = state_batch.reshape(self.batch_size, -1)

action_batch = action_batch.reshape(self.batch_size, -1)

reward_batch = reward_batch.reshape(self.batch_size, -1)

state_next_batch = state_next_batch.reshape(self.batch_size, -1)

done_batch = done_batch.reshape(self.batch_size, -1)

# get estimate Q value

estimate_Q = self.estimate_net(state_batch).gather(1, action_batch)

# get target Q value

max_action_idx = self.estimate_net(state_next_batch).detach().argmax(1)

target_Q = reward_batch + done_batch * self.gamma * self.target_net(

state_next_batch

).gather(1, max_action_idx.unsqueeze(1))

# compute mse loss

# loss = F.mse_loss(estimate_Q, target_Q)

loss = torch.mean(

torch.multiply(torch.square(estimate_Q - target_Q), importance)

)

each_loss += loss.item()

# update network

self.optimizer.zero_grad()

loss.backward()

self.optimizer.step()

#TODO

# update target network

if self.learn_step_counter % 10 == 0:

# self.update_target_networks()

self.target_net.load_state_dict(self.estimate_net.state_dict())

self.learn_step_counter += 1

step += 1

env.render()

# you can update these variables

# if episode_reward % 100 == 0:

# rend += 1

# if rend % 5 == 0:

# self.is_rend = True

# else:

# self.is_rend = False

reward, count = self.eval()

episode_reward += reward

# save

period = 1

if episode % period == 0:

# log

each_loss /= period

episode_reward /= period

avg_reward_list.append(episode_reward)

loss_list.append(each_loss)

print(

"\nepoch: [{}/{}], \tavg loss: {:.4f}, \tavg reward: {:.3f}, \tsteps: {}".format(

episode, num_episode, each_loss, episode_reward, count

)

)

# episode_reward = 0

# create a new directory for saving

path = PATH + "/" + self.timestamp

try:

os.makedirs(path)

except OSError:

pass

np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", loss_list)

np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)

torch.save(self.estimate_net.state_dict(),path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")

env.close()

return loss_list, avg_reward_list

def eval(self):

"""

Evaluate the policy

"""

count = 0

total_reward = 0

done = False

state = self.test_env.reset()

if type(state) == type((1,)):

state = state[0]

while not done:

action = self.choose_action(state, epsilon=1)

observation, reward, done, truncated, info = self.test_env.step(action)

total_reward += reward

count += 1

state = observation

return total_reward, count

if __name__ == "__main__":

# timestamp for saving

named_tuple = time.localtime() # get struct_time

time_string = time.strftime("%Y-%m-%d-%H-%M", named_tuple)

double_dqn_prioritized_object = DDQNPB(

env,

state_dim=105,

action_dim=3,

lr=0.001,

gamma=0.99,

buffer_size=1000,

batch_size=64,

timestamp=time_string,

)

# Train the policy

iterations = 10000

avg_loss, avg_reward_list = double_dqn_prioritized_object.train(iterations)

path = PATH + "/" + time_string

np.save(path + "/DOUBLE_DQN_PRIORITIZED_LOSS.npy", avg_loss)

np.save(path + "/DOUBLE_DQN_PRIORITIZED_REWARD.npy", avg_reward_list)

torch.save(double_dqn_prioritized_object.estimate_net.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_params.pkl")

torch.save(double_dqn_prioritized_object.state_dict(), path + "/DOUBLE_DQN_PRIORITIZED_MODEL.pt")

有些东西可以自己改掉,自己调出的bug才是好bug!(大雾)

写在后面:

关于自定义环境,刚刚花30分钟研究了一下,官方写的教程稀烂(狗头),我自己得到的攻略如下:

找到你的highway-env安装包位置,我的在:E:\formalFiles\Anaconda3-2020.07\envs\autodrive_38\Lib\site-packages\highway_env在highway-env里的envs可以看到多个场景的定义文件,此处拿出intersection_env.py举例,其他的同理。新建一个文件test_env.py,把intersection_env.py的所有内容复制粘贴到里面。 在test_env.py里,重命名如下: class test(AbstractEnv):

#

# ACTIONS: Dict[int, str] = {

# 0: 'SLOWER',

# 1: 'IDLE',

# 2: 'FASTER'

# }

ACTIONS: Dict[int, str] = {

0: 'LANE_LEFT',

1: 'IDLE',

2: 'LANE_RIGHT',

3: 'FASTER',

4: 'SLOWER'

} 删除除了第一个class以外的所有class定义。这里是把动作区间改成5个。 在envs/_init_.py的末尾加上 from highway_env.envs.test_env import * 在highway-env文件夹里找到一个单独的_init_.py,不是上一步的python文件!修改如下: def register_highway_envs():

"""Import the envs module so that envs register themselves."""

# my test environment

register(

id='test-v0',# 引用名

entry_point='highway_env.envs:test'#环境类名

) 修改奖励,来到你的定义环境文件highway-env/envs/test_env.py里面,看到_reward函数,以及和它有关的_agent_reward函数等,可自行改掉算子。utils.py中有函数lmap()。 def _reward(self, action: int) -> float:

"""Aggregated reward, for cooperative agents."""

return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles

) / len(self.controlled_vehicles)

def _agent_reward(self, action: int, vehicle: Vehicle) -> float:

"""Per-agent reward signal."""

rewards = self._agent_rewards(action, vehicle)

reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())

reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward

reward *= rewards["on_road_reward"]

if self.config["normalize_reward"]:

reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])

return reward

def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:

"""Per-agent per-objective reward signal."""

scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])

return {

"collision_reward": vehicle.crashed,

"high_speed_reward": np.clip(scaled_speed, 0, 1),

"arrived_reward": self.has_arrived(vehicle),

"on_road_reward": vehicle.on_road

} 引用自定义环境如下: import highway-env

import gym

env = gym.make("test-v0")

env.reset() 我自定义的环境文件,个人设定,不代表最佳结果: from typing import Dict, Tuple, Text

import numpy as np

from highway_env import utils

from highway_env.envs.common.abstract import AbstractEnv, MultiAgentWrapper

from highway_env.road.lane import LineType, StraightLane, CircularLane, AbstractLane

from highway_env.road.regulation import RegulatedRoad

from highway_env.road.road import RoadNetwork

from highway_env.vehicle.kinematics import Vehicle

from highway_env.vehicle.controller import ControlledVehicle

class test(AbstractEnv):

#

# ACTIONS: Dict[int, str] = {

# 0: 'SLOWER',

# 1: 'IDLE',

# 2: 'FASTER'

# }

ACTIONS: Dict[int, str] = {

0: 'LANE_LEFT',

1: 'IDLE',

2: 'LANE_RIGHT',

3: 'FASTER',

4: 'SLOWER'

}

ACTIONS_INDEXES = {v: k for k, v in ACTIONS.items()}

@classmethod

def default_config(cls) -> dict:

config = super().default_config()

config.update({

"observation": {

"type": "Kinematics",

"vehicles_count": 15,

"features": ["presence", "x", "y", "vx", "vy", "cos_h", "sin_h"],

"features_range": {

"x": [-100, 100],

"y": [-100, 100],

"vx": [-20, 20],

"vy": [-20, 20],

},

"absolute": True,

"flatten": False,

"observe_intentions": False

},

"action": {

"type": "DiscreteMetaAction",

"longitudinal": True,

"lateral": True,

"target_speeds": [0, 4.5, 9]

},

"duration": 13, # [s]

"destination": "o1",

"controlled_vehicles": 1,

"initial_vehicle_count": 10,

"spawn_probability": 0.6,

"screen_width": 600,

"screen_height": 600,

"centering_position": [0.5, 0.6],

"scaling": 5.5 * 1.3,

"collision_reward": -10,

"high_speed_reward": 2,

"arrived_reward": 5,

"reward_speed_range": [7.0, 9.0],# change

"normalize_reward": False,

"offroad_terminal": False

})

return config

def _reward(self, action: int) -> float:

"""Aggregated reward, for cooperative agents."""

return sum(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles

) / len(self.controlled_vehicles)

def _rewards(self, action: int) -> Dict[Text, float]:

"""Multi-objective rewards, for cooperative agents."""

agents_rewards = [self._agent_rewards(action, vehicle) for vehicle in self.controlled_vehicles]

return {

name: sum(agent_rewards[name] for agent_rewards in agents_rewards) / len(agents_rewards)

for name in agents_rewards[0].keys()

}

# edit your reward

def _agent_reward(self, action: int, vehicle: Vehicle) -> float:

"""Per-agent reward signal."""

rewards = self._agent_rewards(action, vehicle)

reward = sum(self.config.get(name, 0) * reward for name, reward in rewards.items())

reward = self.config["arrived_reward"] if rewards["arrived_reward"] else reward

reward *= rewards["on_road_reward"]

if self.config["normalize_reward"]:

reward = utils.lmap(reward, [self.config["collision_reward"], self.config["arrived_reward"]], [0, 1])

return reward

def _agent_rewards(self, action: int, vehicle: Vehicle) -> Dict[Text, float]:

"""Per-agent per-objective reward signal."""

scaled_speed = utils.lmap(vehicle.speed, self.config["reward_speed_range"], [0, 1])

return {

"collision_reward": vehicle.crashed,

"high_speed_reward": np.clip(scaled_speed, 0, 1),

"arrived_reward": self.has_arrived(vehicle),

"on_road_reward": vehicle.on_road

}

def _is_terminated(self) -> bool:

return any(vehicle.crashed for vehicle in self.controlled_vehicles) \

or all(self.has_arrived(vehicle) for vehicle in self.controlled_vehicles) \

or (self.config["offroad_terminal"] and not self.vehicle.on_road)

def _agent_is_terminal(self, vehicle: Vehicle) -> bool:

"""The episode is over when a collision occurs or when the access ramp has been passed."""

return (vehicle.crashed or

self.has_arrived(vehicle) or

self.time >= self.config["duration"])

def _is_truncated(self) -> bool:

return

def _info(self, obs: np.ndarray, action: int) -> dict:

info = super()._info(obs, action)

info["agents_rewards"] = tuple(self._agent_reward(action, vehicle) for vehicle in self.controlled_vehicles)

info["agents_dones"] = tuple(self._agent_is_terminal(vehicle) for vehicle in self.controlled_vehicles)

return info

def _reset(self) -> None:

self._make_road()

self._make_vehicles(self.config["initial_vehicle_count"])

def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, dict]:

obs, reward, terminated, truncated, info = super().step(action)

self._clear_vehicles()

self._spawn_vehicle(spawn_probability=self.config["spawn_probability"])

return obs, reward, terminated, truncated, info

def _make_road(self) -> None:

"""

Make an 4-way intersection.

The horizontal road has the right of way. More precisely, the levels of priority are:

- 3 for horizontal straight lanes and right-turns

- 1 for vertical straight lanes and right-turns

- 2 for horizontal left-turns

- 0 for vertical left-turns

The code for nodes in the road network is:

(o:outer | i:inner + [r:right, l:left]) + (0:south | 1:west | 2:north | 3:east)

:return: the intersection road

"""

lane_width = AbstractLane.DEFAULT_WIDTH

right_turn_radius = lane_width + 5 # [m}

left_turn_radius = right_turn_radius + lane_width # [m}

outer_distance = right_turn_radius + lane_width / 2

access_length = 50 + 50 # [m]

net = RoadNetwork()

n, c, s = LineType.NONE, LineType.CONTINUOUS, LineType.STRIPED

for corner in range(4):

angle = np.radians(90 * corner)

is_horizontal = corner % 2

priority = 3 if is_horizontal else 1

rotation = np.array([[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]])

# Incoming

start = rotation @ np.array([lane_width / 2, access_length + outer_distance])

end = rotation @ np.array([lane_width / 2, outer_distance])

net.add_lane("o" + str(corner), "ir" + str(corner),

StraightLane(start, end, line_types=[s, c], priority=priority, speed_limit=10))

# Right turn

r_center = rotation @ (np.array([outer_distance, outer_distance]))

net.add_lane("ir" + str(corner), "il" + str((corner - 1) % 4),

CircularLane(r_center, right_turn_radius, angle + np.radians(180), angle + np.radians(270),

line_types=[n, c], priority=priority, speed_limit=10))

# Left turn

l_center = rotation @ (np.array([-left_turn_radius + lane_width / 2, left_turn_radius - lane_width / 2]))

net.add_lane("ir" + str(corner), "il" + str((corner + 1) % 4),

CircularLane(l_center, left_turn_radius, angle + np.radians(0), angle + np.radians(-90),

clockwise=False, line_types=[n, n], priority=priority - 1, speed_limit=10))

# Straight

start = rotation @ np.array([lane_width / 2, outer_distance])

end = rotation @ np.array([lane_width / 2, -outer_distance])

net.add_lane("ir" + str(corner), "il" + str((corner + 2) % 4),

StraightLane(start, end, line_types=[s, n], priority=priority, speed_limit=10))

# Exit

start = rotation @ np.flip([lane_width / 2, access_length + outer_distance], axis=0)

end = rotation @ np.flip([lane_width / 2, outer_distance], axis=0)

net.add_lane("il" + str((corner - 1) % 4), "o" + str((corner - 1) % 4),

StraightLane(end, start, line_types=[n, c], priority=priority, speed_limit=10))

road = RegulatedRoad(network=net, np_random=self.np_random, record_history=self.config["show_trajectories"])

self.road = road

def _make_vehicles(self, n_vehicles: int = 10) -> None:

"""

Populate a road with several vehicles on the highway and on the merging lane

:return: the ego-vehicle

"""

# Configure vehicles

vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])

vehicle_type.DISTANCE_WANTED = 5 # Low jam distance

vehicle_type.COMFORT_ACC_MAX = 6

vehicle_type.COMFORT_ACC_MIN = -3

# Random vehicles

simulation_steps = 3

for t in range(n_vehicles - 1):

self._spawn_vehicle(np.linspace(0, 80, n_vehicles)[t])

for _ in range(simulation_steps):

[(self.road.act(), self.road.step(1 / self.config["simulation_frequency"])) for _ in range(self.config["simulation_frequency"])]

# Challenger vehicle

self._spawn_vehicle(60, spawn_probability=1, go_straight=True, position_deviation=0.1, speed_deviation=0)

# Controlled vehicles

self.controlled_vehicles = []

for ego_id in range(0, self.config["controlled_vehicles"]):

ego_lane = self.road.network.get_lane(("o{}".format(ego_id % 4), "ir{}".format(ego_id % 4), 0))

destination = self.config["destination"] or "o" + str(self.np_random.randint(1, 4))

ego_vehicle = self.action_type.vehicle_class(

self.road,

ego_lane.position(60 + 5*self.np_random.normal(1), 0),

speed=ego_lane.speed_limit,

heading=ego_lane.heading_at(60))

try:

ego_vehicle.plan_route_to(destination)

ego_vehicle.speed_index = ego_vehicle.speed_to_index(ego_lane.speed_limit)

ego_vehicle.target_speed = ego_vehicle.index_to_speed(ego_vehicle.speed_index)

except AttributeError:

pass

self.road.vehicles.append(ego_vehicle)

self.controlled_vehicles.append(ego_vehicle)

for v in self.road.vehicles: # Prevent early collisions

if v is not ego_vehicle and np.linalg.norm(v.position - ego_vehicle.position) < 20:

self.road.vehicles.remove(v)

def _spawn_vehicle(self,

longitudinal: float = 0,

position_deviation: float = 1.,

speed_deviation: float = 1.,

spawn_probability: float = 0.6,

go_straight: bool = False) -> None:

if self.np_random.uniform() > spawn_probability:

return

route = self.np_random.choice(range(4), size=2, replace=False)

route[1] = (route[0] + 2) % 4 if go_straight else route[1]

vehicle_type = utils.class_from_path(self.config["other_vehicles_type"])

vehicle = vehicle_type.make_on_lane(self.road, ("o" + str(route[0]), "ir" + str(route[0]), 0),

longitudinal=(longitudinal + 5

+ self.np_random.normal() * position_deviation),

speed=8 + self.np_random.normal() * speed_deviation)

for v in self.road.vehicles:

if np.linalg.norm(v.position - vehicle.position) < 15:

return

vehicle.plan_route_to("o" + str(route[1]))

vehicle.randomize_behavior()

self.road.vehicles.append(vehicle)

return vehicle

def _clear_vehicles(self) -> None:

is_leaving = lambda vehicle: "il" in vehicle.lane_index[0] and "o" in vehicle.lane_index[1] \

and vehicle.lane.local_coordinates(vehicle.position)[0] \

>= vehicle.lane.length - 4 * vehicle.LENGTH

self.road.vehicles = [vehicle for vehicle in self.road.vehicles if

vehicle in self.controlled_vehicles or not (is_leaving(vehicle) or vehicle.route is None)]

def has_arrived(self, vehicle: Vehicle, exit_distance: float = 25) -> bool:

return "il" in vehicle.lane_index[0] \

and "o" in vehicle.lane_index[1] \

and vehicle.lane.local_coordinates(vehicle.position)[0] >= exit_distance

哦,都要一个可视化是吧?来了来了。

在test-v0下,用double_dqn.py训练的图:(action_dim==5)

目前是单智能体,后续的多智能体需要调整输入的数据和动作,以及控制小车的数量,做为后续的待定改进点。

其他?等我写好 多智能体 0-0!

待好心人补充....毕竟这里是无人区啊(悲)

 终有一日,我会成为神一样的提纳里先生!

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: