深度强化学习之:DQN训练超级玛丽闯关

深度强化学习之:DQN训练超级玛丽闯关

首页休闲益智超级玛丽更新时间:2024-04-20

本文所有代码在

https://github.com/MyEncyclopedia/reinforcement-learning-2nd/tree/master/super_mario

DQN 算法回顾

以下是Deepmind 发表在Nature 的 Human-level control through deep reinforcement learning 的完整算法流程。

超级玛丽 NES OpenAI 环境

安装基于OpenAI gym的超级玛丽环境执行下面的 pip 命令即可。

pip install gym-super-mario-bros

用python代码控制马里奥执行随机动作

import gym_super_mario_bros from gym import wrappers env = wrappers.Monitor(env,"./gym-results", force=True) #执行5000个简单的向右随机操作 done = True #游戏结束标志 for step in range(5000): if done: #如果游戏结束则重置: state = env.reset() state, reward, done, info = env.step(env.action_space.sample()) # 关闭创建的游戏env env.close()

然后查看./gym-results文件夹下的mp4文件,即为上面代码执行的过程录像。

我们先来看一下游戏环境的输入和输出。下面代码采用随机的action来和游戏交互。有了 组合游戏系列3: 井字棋、五子棋的OpenAI Gym GUI环境 关于OpenAI Gym 的介绍,现在对于其基本的交互步骤已经不陌生了。

import gym_super_mario_bros from random import random, randrange from gym_super_mario_bros.actions import RIGHT_ONLY from nes_py.wrappers import JoypadSpace from gym import wrappers env = gym_super_mario_bros.make('SuperMarioBros-v0') env = JoypadSpace(env, RIGHT_ONLY) # Play randomly done = False env.reset() step = 0 while not done: action = randrange(len(RIGHT_ONLY)) state, reward, done, info = env.step(action) print(done, step, info) env.render() step = 1 env.close()

随机策略的效果如下

,时长00:06

注意我们在游戏环境初始化的时候用了参数 RIGHT_ONLY,它定义成五种动作的list,表示仅使用右键的一些组合,适用于快速训练来完成Mario第一关。

RIGHT_ONLY = [ ['NOOP'], ['right'], ['right', 'A'], ['right', 'B'], ['right', 'A', 'B'], ]

观察一些 info 输出内容,coins表示金币获得数量,flag_get 表示是否取得最后的旗子,time 剩余时间,以及 Mario 大小状态和所在的 x,y位置。

{ "coins":0, "flag_get":False, "life":2, "score":0, "stage":1, "status":"small", "time":381, "world":1, "x_pos":594, "y_pos":89 } 游戏图像处理

Deep Reinforcement Learning 一般是 end-to-end learning,意味着将游戏的 screen image,即 observed state 直接视为真实状态 state,喂给神经网络去训练。于此相反的另一种做法是,通过游戏环境拿到内部状态,例如所有相关物品的位置和属性作为模型输入。这两种方式的区别在我看来有两点。第一点,用观察到的屏幕像素代替真正的状态 state,在partially observable 的环境时可能因为 non-stationarity 导致无法很好的工作,而拿内部状态利用了额外的作弊信息,在partially observable环境中也可以工作。第二点,第一种方式屏幕像素维度比较高,输入数据量大,需要神经网络的大量训练拟合,第二种方式,内部真实状态往往维度低得多,训练起来很快,但缺点是因为除了内部状态往往还需要游戏相关规则作为输入,因此generalization能力不如前者强。

这里,我们当然采样屏幕像素的 end-to-end 方式了,自然首要任务是将游戏帧图像有效处理。超级玛丽游戏环境的屏幕输出是 (240, 256, 3) shape的 numpy array,通过下面一系列的转换,尽可能的在不影响训练效果的情况下减小采样到的数据量。

  1. MaxAndSkipFrameWrapper:每4个frame连在一起,采取同样的动作,降低Frame数量
  2. FrameDownsampleWrapper:将原始的 (240, 256, 3) down sample 到 (84, 84, 1)
  3. ImageToPytorchWrapper:转换成适合 pytorch 的 shape (1, 84, 84)
  4. FrameBufferWrapper:保存最后4次屏幕采样
  5. NormalizeFloats:Normalize 成 [0., 1.0] 的浮点值

def wrap_environment(env_name: str, action_space: list) -> Wrapper: env = make(env_name) env = JoypadSpace(env, action_space) env = MaxAndSkipFrameWrapper(env) env = FrameDownsampleWrapper(env) env = ImageToPyTorchWrapper(env) env = FrameBufferWrapper(env, 4) env = NormalizeFloats(env) return env CNN 模型

模型比较简单,三个卷积层后做 softmax输出,输出维度数为离散动作数。act() 采用了epsilon-greedy 模式,即在epsilon小概率时采取随机动作来 explore,大于epsilon时采取估计的最可能动作来 exploit。

class DQNModel(nn.Module): def __init__(self, input_shape, num_actions): super(DQNModel, self).__init__() self._input_shape = input_shape self._num_actions = num_actions self.features = nn.Sequential( nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4), nn.ReLU(), nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(), nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU() ) self.fc = nn.Sequential( nn.Linear(self.feature_size, 512), nn.ReLU(), nn.Linear(512, num_actions) ) def forward(self, x): x = self.features(x).view(x.size()[0], -1) return self.fc(x) def act(self, state, epsilon, device): if random() > epsilon: state = torch.FloatTensor(np.float32(state)).unsqueeze(0).to(device) q_value = self.forward(state) action = q_value.max(1)[1].item() else: action = randrange(self._num_actions) return action Experience Replay 缓存

实现采用了 Pytorch CartPole DQN 的官方代码,本质是一个最大为 capacity 的 list 保存了采样到的 (s, a, r, s', is_done) 五元组。

Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'done')) class ReplayMemory: def __init__(self, capacity): self.capacity = capacity self.memory = [] self.position = 0 def push(self, *args): if len(self.memory) < self.capacity: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position 1) % self.capacity def sample(self, batch_size): return random.sample(self.memory, batch_size) def __len__(self): return len(self.memory) DQNAgent

我们将 DQN 的逻辑封装在 DQNAgent 类中。DQNAgent 成员变量包括两个 DQNModel,一个ReplayMemory。

train() 方法中会每隔一定时间将 Target Network 的参数同步成现行Network的参数。在td_loss_backprop()方法中采样 ReplayMemory 中的五元组,通过minimize TD error方式来改进现行 Network 参数 。Loss函数为:

class DQNAgent(): def act(self, state, episode_idx): self.update_epsilon(episode_idx) action = self.model.act(state, self.epsilon, self.device) return action def process(self, episode_idx, state, action, reward, next_state, done): self.replay_mem.push(state, action, reward, next_state, done) self.train(episode_idx) def train(self, episode_idx): if len(self.replay_mem) > self.initial_learning: if episode_idx % self.target_update_frequency == 0: self.target_model.load_state_dict(self.model.state_dict()) self.optimizer.zero_grad() self.td_loss_backprop() self.optimizer.step() def td_loss_backprop(self): transitions = self.replay_mem.sample(self.batch_size) batch = Transition(*zip(*transitions)) state = Variable(FloatTensor(np.float32(batch.state))).to(self.device) action = Variable(LongTensor(batch.action)).to(self.device) reward = Variable(FloatTensor(batch.reward)).to(self.device) next_state = Variable(FloatTensor(np.float32(batch.next_state))).to(self.device) done = Variable(FloatTensor(batch.done)).to(self.device) q_values = self.model(state) next_q_values = self.target_net(next_state) q_value = q_values.gather(1, action.unsqueeze(-1)).squeeze(-1) next_q_value = next_q_values.max(1)[0] expected_q_value = reward self.gamma * next_q_value * (1 - done) loss = (q_value - expected_q_value.detach()).pow(2) loss = loss.mean() loss.backward() 外层控制代码

最后是外层调用代码,基本和以前文章一样。

def train(env, args, agent): for episode_idx in range(args.num_episodes): episode_reward = 0.0 state = env.reset() while True: action = agent.act(state, episode_idx) if args.render: env.render() next_state, reward, done, stats = env.step(action) agent.process(episode_idx, state, action, reward, next_state, done) state = next_state episode_reward = reward if done: print(f'{episode_idx}: {episode_reward}') break

查看全文
大家还看了
也许喜欢
更多游戏

Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved