【深層強化学習】Double Deep Q Network(DDQN)

今回はQ学習(or Deep Q Network)の課題と、
その課題を解決するDouble Deep Q Networkについて説明していきます。

基礎知識

今回は基礎知識として以下の項目をある程度理解しているとします。

Q学習の問題点

まずQ学習の問題点についてお話します。

Q学習は遷移先の最大行動価値を用いているので、過大評価(overestimate)という問題があります。

まずはQ学習の更新式を見てみましょう。

$$
\newcommand{\argmax}{\mathop{\rm argmax}\limits}
\begin{aligned}
Q(s, a) \leftarrow Q(s, a) + \alpha (r + \gamma \color{red}{\max _{a}Q(s’, a)}-Q(s, a))
\end{aligned}$$

上記式の赤の部分がQ学習の特徴である最大行動価値を利用している箇所であり、過大評価の原因となるところです。
たまたまある状態で高い報酬を得てしまった場合、
そこの状態を良い状態と判断してしまい、その結果がその他の状態価値にまで伝搬してしまいます。
そして、間違った学習をしてしまう
という問題です。

では、簡単な例を見てみましょう。
この例は参考文献2を参考としています。

状態Aがスタート地点、四角がゴール地点(終点)となります。
矢印の上の数字が報酬です。

状態Bから左への選択肢は複数あり、
各選択肢を選択したときに得られる報酬の期待値は
\(N(-0.1, 1)\)となります。
\(N(-0.1, 1)\)は平均-0.1、標準偏差1の正規分布から得られる報酬です。

状態Aからエージェントは右、左のどちらが良いかを学習します。
見てわかる通り、右に行けば報酬0を受け取ってゴールとなります。

左を選択し、左の終点にたどり着いた時の累積報酬は\(0+\gamma N(-0.1, 1)\)になります。
\(N(-0.1, 1)\)は平均としては-0.1なので、
累積報酬の最終的な期待値は\((0 -0.1\gamma) = -0.1\gamma\)です。
なので、左を選択した時の\(-0.1\gamma < 0)\)となり、エージェントは
状態Aでは右を選んだほうが良いとわかります。

しかしながら、Q学習は遷移先状態の最大行動価値だけを使い学習します。
このことによって間違った学習を最初の段階でしてしまいます。

なぜ、最大値を使うと間違えるのか、具体的な数値を使って説明していきます。
状態Bから左の終点に遷移した時に得られた報酬は以下のようになったとします。

\(a_0\) \(a_1\) \(a_2\)
-1 1 -0.1
0.9 -0.5 -0.2

このときQ(A, Left)は、

$$\begin{aligned}
Q(A, Left) &\leftarrow Q(A, Left) + \gamma (0 + \gamma \max(-0.05, 0.25, -0.15) – Q(A, Left)) \\
&= Q(A, Left) + \gamma (0 + \gamma \times 0.25 – Q(A, Left))
\end{aligned}$$

ちなみに各数値は\(-0.05=(-1+0.9)/2\)、\(0.25=(1-0.5)/2\), \(-0.15=(-0.1-0.2)/2\)。

すると、\(Q(A, Left)>0\)となってしまうことがわかります。
つまり、\(Q(A, Left)>Q(A, Right)\)となってしまい、左のほうが良いと判断してしまうことがわかります。
これが過大評価です。

Double Deep Q Network

Double Deep Q Network(DDQN)では2つのQ Network(Q Network, Target Network)を用いて、過大評価を軽減させます。
DQNでも2つのQ Networkを用いますが、使い方に少し工夫を加えます。

まず、通常のDQNのTDターゲット\( T^{DQN}\)を確認してみます。

ここで、\(\theta ^{-}\)はTarget Networkのパラメータ、
\(\theta\)はQ Networkのパラメータとします。

この式からわかるとおり、DQNでは単純にTarget Networkの最大行動価値を用いています。

一方でDDQNでは以下のようになります。

2種類のパラメータ(\(\theta , \theta ^{-}\))を用いていることがわかります。

単純にTarget Networkの遷移先状態\(S_{t+1}\)の最大行動価値\(\max _{a}Q(S_{t+1}, a;{\bf \theta ^{-}})\)を用いるのではなく、
Q Networkの\(S_{t+1}\)における行動価値を最大化する行動\(a = \argmax _{a}Q(S_{t+1}, a;{\bf \theta})\)を用いてTD ターゲットの値を決めます。

このように、Target Networkの最大行動価値を用いるのではなく、Q Networkを組み合わせることでQ学習の特徴(最大行動価値を使う)を維持しながら、過剰評価を軽減させます。

実装:TensorFlowとKerasを使って

では、実装していきます。
TensorFlowとKerasを用いています。
(もうKerasはTensorFlowの一部となっているのでTensorFlowだけとも言えますが・・)

DDQNエージェントのソースコード全体像

エージェント全体のソースコードの全体像を示します。

class DDQNAgent():
    """
        Double Deep Q Network Agent
    """

    def __init__(self, training=None, policy=None, gamma=0.99, actions=None,
                 memory=None, memory_interval=1, train_interval=1,
                 batch_size=32, nb_steps_warmup=200,
                 observation=None, input_shape=None, sess=None):

        self.training = training
        self.policy = policy
        self.actions = actions
        self.gamma = gamma
        self.recent_observation = observation
        self.previous_observation = observation
        self.memory = memory
        self.memory_interval = memory_interval
        self.batch_size = batch_size
        self.recent_action_id = None
        self.nb_steps_warmup = nb_steps_warmup
        self.sess = sess

        self.model_inputs, self.model_outputs, self.model = build_model(input_shape, len(self.actions))
        self.target_model_inputs, self.target_model_outputs, self.target_model = build_model(input_shape, len(self.actions))
        target_model_weights = self.target_model.trainable_weights
        model_weights = self.model.trainable_weights

        # hard update
        # self.update_target_model = [target_model_weights[i].assign(model_weights[i]) for i in range(len(target_model_weights))]
        # soft update
        self.update_target_model = [target_model_weights[i].assign(
            .999*target_model_weights[i]+.001*model_weights[i]) for i in range(len(target_model_weights))]
        self.train_interval = train_interval
        self.step = 0

    def compile(self):
        self.targets = tf.placeholder(dtype=tf.float32, shape=[
                                      None, 2], name="target_q")
        self.inputs = tf.placeholder(
            dtype=tf.int32, shape=[None], name="action")
        actions_one_hot = tf.one_hot(indices=self.inputs, depth=len(
            self.actions), on_value=1.0, off_value=0.0, name="action_one_hot")

        pred_q = tf.multiply(self.model_outputs, actions_one_hot)

        error = self.targets - pred_q
        square_error = .5 * tf.square(error)
        loss = tf.reduce_mean(square_error, axis=0, name="loss")

        optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
        self.train = optimizer.minimize(loss)
        self.sess.run(tf.initialize_all_variables())

    def act(self):
        action_id = self.forward()
        action = self.actions[action_id]
        return action

    def forward(self):
        q_values = self.compute_q_values(self.recent_observation)
        action_id = self.policy.select_action(
            q_values=q_values, is_training=self.training)
        self.recent_action_id = action_id

        return action_id

    def observe(self, observation, reward=None, is_terminal=None):
        self.previous_observation = copy.deepcopy(self.recent_observation)
        self.recent_observation = observation

        if self.training and reward is not None:
            if self.step % self.memory_interval == 0:
                self.memory.append(
                    self.previous_observation, self.recent_action_id, reward, terminal=is_terminal)
            self.experience_replay()
            self.policy.decay_eps_rate()

        self.step += 1

    def experience_replay(self):
        if (self.step > self.nb_steps_warmup) and (self.step % self.train_interval == 0):
            experiences = self.memory.sample(self.batch_size)

            state0_batch = []
            reward_batch = []
            action_batch = []
            state1_batch = []
            terminal_batch = []

            for e in experiences:
                state0_batch.append(e.state0)
                state1_batch.append(e.state1)
                reward_batch.append(e.reward)
                action_batch.append(e.action)
                terminal_batch.append(0. if e.terminal else 1.)

            target_batch = np.zeros((self.batch_size, len(self.actions)))
            reward_batch = np.array(reward_batch)

            # q values of q network
            q_values = self.predict_on_batch_by_model(state1_batch)
            # argmax actions of q network
            argmax_actions = np.argmax(q_values, axis=1)
            target_q_values = np.array(self.predict_on_batch_by_target(state1_batch))  # compute maxQ'(s')
            # Q(s', argmax_a Q(s, a;theta_q); theta_target), 
            double_q_values = []
            for a, t in zip(argmax_actions, target_q_values):
                double_q_values.append(t[a])
            double_q_values = np.array(double_q_values)

            discounted_reward_batch = (self.gamma * double_q_values)
            discounted_reward_batch *= terminal_batch
            # target = r + γ maxQ'(s')
            targets = reward_batch + discounted_reward_batch

            for idx, (action, target) in enumerate(zip(action_batch, targets)):
                target_batch[idx][action] = target

            self.train_on_batch(state0_batch, action_batch, target_batch)

        # soft update
        self.sess.run(self.update_target_model)

    def train_on_batch(self, state_batch, action_batch, targets):
        self.sess.run(self.train, feed_dict={
                      self.model_inputs: state_batch, self.inputs: action_batch, self.targets: targets})

    def compute_target_q_value(self, state1_batch):
        q_values = self.sess.run(self.target_model_outputs, feed_dict={
                                 self.target_model_inputs: state1_batch})
        q_values = np.max(q_values, axis=1)

        return q_values

    def predict_on_batch_by_model(self, state1_batch):
        q_values = self.sess.run(self.model_outputs, feed_dict={
                                 self.model_inputs: state1_batch})

        return q_values

    def predict_on_batch_by_target(self, state1_batch):
        q_values = self.sess.run(self.target_model_outputs, feed_dict={
                                 self.target_model_inputs: state1_batch})
        return q_values

    def compute_q_values(self, state):
        q_values = self.sess.run(self.target_model_outputs, feed_dict={
                                 self.target_model_inputs: [state]})

        return q_values[0]

    def update_target_model_hard(self):
        """ for hard update """
        self.sess.run(self.update_target_model)

    def reset(self):
        self.recent_observation = None
        self.previous_observation = None
        self.recent_action_id = None

ソースコード一部解説(DDQNの部分)

基本的にはDQNとソースコードは変わらないので、
DDQN特有の部分だけを述べます。
変化するのはexperience repalyメソッド(82行から124行)のTDターゲットを計算する部分です。

まず、遷移先状態\(s’\)のQ Networkの出力を取得します。
ソースコードの102行目、

# q values of q network
q_values = self.predict_on_batch_by_model(state1_batch)

このQ Networkからの出力を最大化する行動\(\hat{a} = \argmax_{a’}Q(s’,a’;\theta)\)を取り出します。
ソースコードの104行目、

# argmax actions of q network
argmax__actions = np.argmax(q_values, axis=1)

Target Networkの出力を計算します。
ソースコードの105行目、

target_q_values = np.array(self.predict_on_batch_by_target(state1_batch))  # compute maxQ'(s')

\(Q(s, \hat{a};\theta ^{-})\)を計算します。
ソースコードの107行目から、

# Q(s', argmax_a Q(s, a;theta_q); theta_target), 
double_q_values = []
for a, t in zip(argmax_actions, target_q_values):
    double_q_values.append(t[a])
double_q_values = np.array(double_q_values)

これまでの計算で得た値によってDDQNのTDターゲットを計算できます。

実験

単純なDQNと比較してみました。
前回のDQNの記事と同様にCartPole問題を使って評価しました。

横軸はepisode(\(\times 10\))です。今回は10回に一回評価してみました。
縦軸がPoleが立っているstep数を表しています。

簡単な問題ですが、多少DDQNの性能のほうが良くなっている感じがします。
マシンパワーが足りないので実験回数が1回というのが少しさびしいですが。

終わりに

今回はQ学習の課題とDouble Deep Q Networkについて解説しました。
簡単な実験ですが、Double Deep Q Networkが性能を改善させていることを確認出来て良かったです。

参考文献

  1. https://arxiv.org/pdf/1509.06461.pdf
  2. https://medium.com/@ameetsd97/deep-double-q-learning-why-you-should-use-it-bedf660d5295
タイトルとURLをコピーしました