ノート

強化学習関連のこと

MENU

【深層強化学習】【TensorFlow】Deep-Q-Networkを実装してみた

Deep Q Network

概要

Deep Q Network(DQN)を、今さらながら実装してみました。cartpole問題で実験しました。

DQNについて

Deep Q Network(DQN)はQテーブルをニューラルネットワーク関数近似したQ学習です。もしかしたら、Deep Reinforcement Learningと呼んだほうが良いかもしれません。厳密には以下の三つの工夫を加えたものをDQNと呼ぶようです。

1. Experience Reply

エージェントの経験を蓄えておき、 定期的にバッチ学習を行う。

2. Fixed Target Q-Network

TD誤差を計算するためのQ-Network(Target Q-Network)と、その他(学習、行動選択)で用いるQ-Networkの二つのNetworkを用いる

www.tcom242242.net

3. Reward Clipping

外れ値等に過剰に反応しすぎないために、 報酬値を-1〜1の範囲にクリップすること。 最近は報酬値をクリップというよりhuber lossを用いることが主流になっているように思います。

プログラム

keras-rlを参考にして、tensorflowを使って実装してみました。けっこうkeras-rlのコードを参考にしています。cartpole問題を用いて、実験してみました。

keras-rlとcartpoleについては以下参照

www.tcom242242.net

ソースコード

import tensorflow as tf
import gym
import numpy as np
import matplotlib.pyplot as plt
from copy import deepcopy
from abc import ABCMeta, abstractmethod
from collections import deque, namedtuple
import random


class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

    def get_config(self):
        return {}


class EpsGreedyQPolicy(Policy):
    def __init__(self, eps=.1, eps_decay_rate=0.99):
        super(EpsGreedyQPolicy, self).__init__()
        self.eps = eps
        self.eps_decay_rate = eps_decay_rate

    def select_action(self, q_values):
        assert q_values.ndim == 1
        nb_actions = q_values.shape[0]
        if np.random.uniform() < self.eps:
            action = np.random.random_integers(0, nb_actions-1)
        else:
            action = np.argmax(q_values)


        return action

    def decay_eps_rate(self):
        self.eps = self.eps*self.eps_decay_rate
        if self.eps < 0.01:
            self.eps = 0.01


    def select_greedy_action(self, q_values):
        assert q_values.ndim == 1
        action = np.argmax(q_values)

        return action

    def get_config(self):
        config = super(EpsGreedyQPolicy, self).get_config()
        config['eps'] = self.eps
        return config

class Network():
    def __init__(self, sess, name="default"):
        self.sess = sess
        self.copy_op = None
        self.name = name
        self.vars = {}

    def build_model(self, input_shape, output_shape, name):

        with tf.variable_scope(name):
            self.inputs = tf.placeholder(dtype=tf.float32, shape = [None,]+input_shape, name="input")

            x = tf.layers.dense(self.inputs, 16, activation=tf.nn.relu)
            x = tf.layers.dense(x, 16, activation=tf.nn.relu)
            x = tf.layers.dense(x, 16, activation=tf.nn.relu)
            self.outputs = tf.layers.dense(x , output_shape)

            for v in tf.trainable_variables(scope=name):
                self.vars[v.name] = v

        return self.inputs, self.outputs, 


Experience = namedtuple('Experience', 'state0, action, reward, state1, terminal')

def sample_batch_indexes(low, high, size):
    if high - low >= size:
        try:
            r = range(low, high)
        except NameError:
            r = range(low, high)
        batch_idxs = random.sample(r, size)
    else:
        batch_idxs = np.random.random_integers(low, high - 1, size=size)
    assert len(batch_idxs) == size
    return batch_idxs


class Memory():
    """
        Experience replay用のメモリ 
    """
    def __init__(self, limit, maxlen):
        self.limit = limit
        # self.actions = RingBuffer(limit)
        self.actions = deque(maxlen=limit)
        self.rewards = deque(maxlen=limit)
        self.terminals = deque(maxlen=limit)
        self.observations = deque(maxlen=limit)
        self.maxlen = maxlen
        self.recent_observations = deque(maxlen=maxlen)

    def sample(self, batch_size, batch_idxs=None):
        batch_idxs = sample_batch_indexes(0, len(self.observations) - 1, size=batch_size)
        # シンプルに[(s, a)->(s1, r)]の経験をサンプルリング
        for (i, idx) in enumerate(batch_idxs):
            terminal = self.terminals[idx-1]
            while terminal:
                idx = sample_batch_indexes(0, len(self.observations)-1, size=1)[0]
                batch_idxs[i] = idx
                terminal = self.terminals[idx-1]

        experiences = []
        for idx in batch_idxs:
            state0 = self.observations[idx]
            action = self.actions[idx]
            reward = self.rewards[idx]
            terminal = self.terminals[idx]
            state1 = self.observations[idx+1]
            experiences.append(Experience(state0=state0, action=action, reward=reward,state1=state1, terminal=terminal))

        return experiences

    def append(self, observation, action, reward, terminal=False, training=True):
        if training:
            self.observations.append(observation)
            self.actions.append(action)
            self.rewards.append(reward)
            self.terminals.append(terminal)
            self.recent_observations.append(observation)

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, id=None, name=None, training=None, policy=None):
        self.id = id
        self.name = name
        self.training = training
        self.policy = policy
        self.reward_history = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass

    @abstractmethod
    def observe(self, next_state):
        pass

class DQNAgent(Agent):
    """
        keras-rlのコードを参考にしたDQNエージェント
    """
    def __init__(self, gamma=0.99, alpha_decay_rate=0.999, actions=None, memory=None, memory_interval=1,train_interval=1, 
                 batch_size=32, update_interval=10, nb_steps_warmup=100, observation=None,
                 input_shape=None, 
                 **kwargs):

        super().__init__(**kwargs)
        self.actions = actions
        self.gamma = gamma
        self.state = observation
        self.alpha_decay_rate = alpha_decay_rate
        self.recent_observation = observation
        self.update_interval = update_interval
        self.memory = memory
        self.memory_interval = memory_interval
        self.batch_size = batch_size
        self.recent_action_id = 0
        self.nb_steps_warmup = nb_steps_warmup
        self.sess = tf.InteractiveSession()
        self.net = Network(self.sess)
        self.model_inputs, self.model_outputs, self.model_max_outputs, self.model = self.build_model(input_shape, len(self.actions))
        self.target_model_inputs, self.target_model_outputs, self.target_model_max_outputs, self.target_model= self.build_model(input_shape, len(self.actions))
        target_model_weights = self.target_model.trainable_weights
        model_weights = self.model.trainable_weights
        self.update_target_model = [target_model_weights[i].assign(model_weights[i]) for i in range(len(target_model_weights))]
        self.train_interval = train_interval
        self.step = 0

    def build_model(self, input_shape, nb_output):
        model = tf.keras.models.Sequential()
        inputs = tf.placeholder(dtype=tf.float32, shape = [None,]+input_shape, name="input")
        model.add(tf.keras.layers.Dense(16, activation="relu", input_shape =[None,]+input_shape))
        model.add(tf.keras.layers.Dense(16, activation="relu"))
        model.add(tf.keras.layers.Dense(16, activation="relu"))
        model.add(tf.keras.layers.Dense(nb_output))
        outputs = model(inputs)
        max_outputs = tf.reduce_max(outputs, reduction_indices=1)

        return inputs, outputs, max_outputs, model

    def compile(self, optimizer=None):
        self.targets = tf.placeholder(dtype=tf.float32, shape=[None, len(self.actions)], name="target_q")
        self.inputs= tf.placeholder(dtype=tf.int32, shape=[None], name="action")
        mask = tf.one_hot(indices=self.inputs, depth=len(self.actions), on_value=1.0, off_value=0.0, name="action_one_hot")
        self.pred_q = tf.multiply(self.model_outputs, mask)
        self.delta = tf.pow(self.targets - self.pred_q, 2)

        # huber loss
        self.clipped_error = tf.where(self.delta < 1.0,
                                      0.5 * tf.square(self.delta),
                                      self.delta - 0.5, name="clipped_error")
        self.loss = tf.reduce_mean(self.clipped_error, name="loss")

        if optimizer is None:
            optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
        else:
            optimizer = optimizer
        self.train = optimizer.minimize(self.loss)
        self.sess.run(tf.global_variables_initializer())

    def update_target_model_hard(self):
        """ copy q-network to target network """
        self.sess.run(self.update_target_model)

    def train_on_batch(self, state_batch, action_batch, targets):
        self.sess.run(self.train, feed_dict={self.model_inputs:state_batch, self.inputs:action_batch, self.targets:targets})

    def predict_on_batch(self, state1_batch):
        q_values = self.sess.run(self.target_model_max_outputs, feed_dict={self.target_model_inputs:state1_batch})
        return q_values

    def compute_q_values(self, state):
        q_values = self.sess.run(self.model_outputs, feed_dict={self.model_inputs:[state]})
        return q_values[0]

    def get_reward(self, reward, terminal):
        self.reward_history.append(reward)
        if self.training:
            self._update_q_value(reward, terminal)

        self.policy.decay_eps_rate()
        self.step += 1

    def _update_q_value(self, reward, terminal):
        self.backward(reward, terminal)

    def backward(self, reward, terminal):
        if self.step % self.memory_interval == 0:
            """ store experience """
            self.memory.append(self.recent_observation, self.recent_action_id, reward, terminal=terminal, training=self.training)

        if (self.step > self.nb_steps_warmup) and (self.step % self.train_interval == 0):
            experiences = self.memory.sample(self.batch_size)

            state0_batch = []
            reward_batch = []
            action_batch = []
            state1_batch = []
            terminal_batch = []
            for e in experiences:
                state0_batch.append(e.state0)
                state1_batch.append(e.state1)
                reward_batch.append(e.reward)
                action_batch.append(e.action)
                terminal_batch.append(0. if e.terminal else 1.)

            reward_batch = np.array(reward_batch)
            target_q_values = np.array(self.predict_on_batch(state1_batch))   # compute maxQ'(s')

            targets = np.zeros((self.batch_size, len(self.actions)))

            discounted_reward_batch = (self.gamma * target_q_values)
            discounted_reward_batch *= terminal_batch
            Rs = reward_batch + discounted_reward_batch    # target = r + γ maxQ'(s')

            for idx, (target, R, action) in enumerate(zip(targets, Rs, action_batch)):
                target[action] = R  

            self.train_on_batch(state0_batch, action_batch, targets)

        if self.step % self.update_interval == 0:
            """ update target network """
            self.update_target_model_hard()

    def act(self):
        action_id = self.forward()
        action = self.actions[action_id]
        return action

    def forward(self):
        state = self.recent_observation
        q_values = self.compute_q_values(state)
        if self.training:
            action_id = self.policy.select_action(q_values=q_values)
        else:
            action_id = self.policy.select_greedy_action(q_values=q_values)

        self.recent_action_id = action_id
        return action_id

    def observe(self, next_state):
        self.recent_observation = next_state

    def reset(self):
        self.recent_observation = None
        self.recent_action_id = None


if __name__ == '__main__':
    env = gym.make('CartPole-v0')  # ゲームを指定して読み込む
    np.random.seed(123)
    env.seed(123)
    nb_actions = env.action_space.n
    actions = np.arange(nb_actions)
    policy = EpsGreedyQPolicy(0.1)
    memory = Memory(limit=50000, maxlen=1)
    obs = env.reset()
    agent = DQNAgent(actions=actions, memory=memory, update_interval=200, train_interval=1, batch_size=32,
                     memory_interval=1, observation=obs, input_shape=[len(obs)], id=1, name=None, training=True, policy=policy)
    agent.compile()

    result = []
    nb_epsiodes = 800
    for episode in range(nb_epsiodes):  # 1000エピソード回す
        agent.reset()
        observation =  env.reset() # 環境の初期化
        observation = deepcopy(observation)
        agent.observe(observation)
        done = False
        while not done:
            # env.render() # 表示
            action = deepcopy(agent.act())
            observation, reward, done, info = env.step(action) # アクションを実行した結果の状態、報酬、ゲームをクリアしたかどうか、その他の情報を返す
            observation = deepcopy(observation)
            agent.get_reward(reward, done)
            agent.observe(observation)
            if done:
                break

        # 評価
        agent.training = False
        observation = env.reset() # 環境の初期化
        agent.observe(observation)
        done = False
        step = 0
        while not done:
            # env.render() # 表示
            step+=1
            action = agent.act()
            observation, reward, done, info = env.step(action)
            agent.observe(observation)
            if done:
                print("Episode {}: {} steps".format(episode, step))
                result.append(step)
                break

        agent.act()
        agent.get_reward(0, False)
        agent.training = True

    x = np.arange(len(result))
    plt.ylabel("time")
    plt.xlabel("episode")
    plt.plot(x, result)
    plt.savefig("result.png")

実験結果

f:id:ttt242242:20180822211055p:plain