ノート

強化学習関連のこと

MENU

【強化学習】【入門】Q学習

Q学習の説明のために、 簡単な迷路の例を用いて、Q学習を説明したいと思います。

最後に例で用いた迷路を実装してQ学習で解かせてみました。

Q学習

Q学習とは、現在の行動価値を更新する際に遷移先の状態の 最大行動価値を用いるような学習手法です。

Q学習では 以下のような手順を繰り返し行い学習していきます。

  1. 現在の位置(状態 $s$)における行動価値関数を用いて進行方向(行動$a$)の選択(ε-greedyなど)
  2. 実際に行動と状態の遷移
  3. エージェントは行動した後の位置(遷移先の状態 $s'$) と 報酬 $r$ (ゴールであれば、1、それ以外は、0) を観測。
  4. 得られた報酬を元に以下の式を用いて前回の位置での進行方向の価値(行動価値)を更新(学習)

$$ Q(s,a) \leftarrow Q(s,a) + \alpha(r + \max_{a' \in A}Q(s',a')-Q(s,a)) $$ この式は、状態sにおいて行動aを行った時に得られる報酬の期待値を表しています。

rは報酬、$\max_{a' \in A}Q(s',a')$ は遷移先状態s'においての最大行動価値、

$\alpha$は学習率を表しています。

具体例(迷路問題)

迷路問題をQ学習エージェントに解かせてみようと思います。

Q学習エージェントの目的は最短経路を見つけることです。

もちろん人が上からこのマップを見れば、簡単にわかりますが、

エージェント自身は上から見えませんし、ゴールの座標もわかりません。

エージェントが観測できるのは、現在の自分の位置(状態)、その状態でもらえる報酬のみです。

エージェントは何回もスタート地点からゴールまで行ってみて、

少しづつ学習していきます。

図を用いて、どのように学習していくかを説明

f:id:ttt242242:20180820150948j:plain

f:id:ttt242242:20180824194705j:plain

f:id:ttt242242:20180820151003j:plain

f:id:ttt242242:20180820151007j:plain

f:id:ttt242242:20180820151013j:plain

f:id:ttt242242:20180820151018j:plain

f:id:ttt242242:20180824194803j:plain

ゴールにたどり着いたら、同じスタート地点にエージェントを戻し、

繰り返し迷路を解かせます。

実験結果

上の図と同じマップで実験。

使用したプログラムは下にあります。

<省略>
Episode97
action:0, state:(2, 0), reward:0
action:0, state:(1, 0), reward:0
action:3, state:(0, 0), reward:0
action:3, state:(0, 1), reward:100
Episode98
action:0, state:(2, 0), reward:0
action:0, state:(1, 0), reward:0
action:3, state:(0, 0), reward:0
action:3, state:(0, 1), reward:100
Episode99
action:0, state:(2, 0), reward:0
action:0, state:(1, 0), reward:0
action:3, state:(0, 0), reward:0
action:3, state:(0, 1), reward:100
エピソード毎の平均報酬の推移

最短で4ステップで着くので、平均報酬の最大値は100/4で25になります

f:id:ttt242242:20180820152157p:plain

プログラム

import random
import numpy as np
import matplotlib.pyplot as plt


FILED_TYPE = {
    "N": 0,
    "S": 1,
    "G": 2,
    "X": 3, }

ACTIONS = {
    "UP": 0,
    "DOWN": 1,
    "LEFT": 2,
    "RIGHT": 3}


class GridEnv:
    """
        環境
    """
    def __init__(self):
        self.map = [[0, 0, 2],
                [0, 3, 0],
                [1, 0, 0]]

        self.start_pos = 2, 0
        self.agent_pos = 2, 0

    def step(self, action):
        """
            return pos, reward
        """
        to_y, to_x = self.agent_pos

        if self._is_possible_action(to_x, to_y, action) == False:
            return self.agent_pos, -1, False

        if action == ACTIONS["UP"]:
            to_y += -1
        elif action == ACTIONS["DOWN"]:
            to_y += 1
        elif action == ACTIONS["LEFT"]:
            to_x += -1
        elif action == ACTIONS["RIGHT"]:
            to_x += 1

        is_goal = self._is_goal(to_x, to_y)
        reward = self._compute_reward(to_x, to_y)
        self.agent_pos = to_y, to_x
        return self.agent_pos, reward, is_goal

    def _is_goal(self, x, y):
        if self.map[y][x] == FILED_TYPE["G"]:
            return True
        else:
            return False

    def _is_possible_action(self, x, y, action):
        """
            実行可能な行動かどうかの判定
        """
        to_x = x
        to_y = y

        if action == ACTIONS["UP"]:
            to_y += -1
        elif action == ACTIONS["DOWN"]:
            to_y += 1
        elif action == ACTIONS["LEFT"]:
            to_x += -1
        elif action == ACTIONS["RIGHT"]:
            to_x += 1
        else:
            raize("Action Error")

        if len(self.map) <= to_y or 0 > to_y:
            return False
        elif len(self.map[0]) <= to_x or 0 > to_x:
            return False

        return True

    def _compute_reward(self, x, y):
        if self.map[y][x] == FILED_TYPE["N"] or self.map[y][x] == FILED_TYPE["S"]:
            return 0
        elif self.map[y][x] == FILED_TYPE["X"]:
            return -100
        elif self.map[y][x] == FILED_TYPE["G"]:
            return 100

    def reset(self):
        self.agent_pos = self.start_pos
        return self.start_pos


MinEps = 0.001  # 最小のε
class EpsGreedyQPolicy:
    """
        ε-greedy選択
    """
    def __init__(self, epsilon=.1, decay_rate=1):
        self.epsilon = epsilon
        self.decay_rate = decay_rate
        self.name = "EPS"

    def select_action(self, q_values):
        assert q_values.ndim == 1
        nb_actions = q_values.shape[0]

        if np.random.uniform() < self.epsilon:  # random行動
            action = np.random.random_integers(0, nb_actions-1)
        else:   # greedy 行動
            action = np.argmax(q_values)

        self.decay_epsilon()
        return action

    def decay_epsilon(self):    # 探索率を減少
        self.epsilon = self.epsilon*self.decay_rate
        if self.epsilon <= MinEps:
            self.epsilon = MinEps


INIQ = 0.0    # 初期のQ値
MinAlpha = 0.01 # 最小のα
class QLearningAgent:
    """
        シンプルなq学習エージェント
    """
    def __init__(self, alpha=0.2, policy=None, gamma=0.99, actions=None, observation=None, alpha_decay_rate=None, epsilon_decay_rate=None):
        self.alpha = alpha
        self.policy = policy
        self.reward_history = []
        self.actions = actions
        self.gamma = gamma
        self.alpha_decay_rate = alpha_decay_rate
        self.epsilon_decay_rate = epsilon_decay_rate
        self.state = str(observation)
        self.ini_state = str(observation)
        self.last_state = str(observation)
        self.last_action_id = None
        self.q_values = self._init_q_values()

    def _init_q_values(self):
        q_values = {}
        q_values[self.state] = np.repeat(INIQ, len(self.actions))
        return q_values

    def init_state(self):
        self.last_state = copy.deepcopy(self.ini_state)
        self.state = copy.deepcopy(self.ini_state)
        return self.state

    def act(self):
        action_id = self.policy.select_action(self.q_values[self.state])
        self.last_action_id = action_id
        action = self.actions[action_id]
        return action

    def get_reward(self, reward):
        """
            報酬の取得
        """
        self.reward_history.append(reward)
        self.q_values[self.last_state][self.last_action_id], pre_q, max_q2 = self._update_q_value(reward)
        self.decay_alpha()

    def observe(self, next_state):
        """
            遷移先の状態の観測
        """
        next_state = str(next_state)
        if next_state not in self.q_values:
            self.q_values[next_state] = np.repeat(INIQ, len(self.actions))

        self.last_state = self.state
        self.state = next_state

    def _update_q_value(self, reward):
        pre_q = self.q_values[self.last_state][self.last_action_id]
        max_q2 = max(self.q_values[self.state])
        updated_q = ((1.0 - self.alpha) * pre_q) + (self.alpha * (reward + (self.gamma * max_q2)))

        return updated_q, pre_q, max_q2

    def decay_alpha(self):
        if self.alpha_decay_rate is not None:
            if self.alpha >= MinAlpha:
                self.alpha *= self.alpha_decay_rate


if __name__ == '__main__':
    grid_env = GridEnv()
    ini_state = grid_env.start_pos  # 初期状態(エージェントのスタート地点の位置)
    is_goal = False # エージェントがゴールしてるかどうか?
    policy = EpsGreedyQPolicy(epsilon=1.0, decay_rate=0.99)
    agent = QLearningAgent(actions=np.arange(4), observation=ini_state, policy=policy)
    nb_episode = 100   # エピソード数
    rewards = []
    for episode in range(nb_episode):
        episode_reward = []
        print("Episode{}".format(episode))
        while not is_goal:
            action = agent.act()
            state, reward, is_goal = grid_env.step(action)
            print("action:{}, state:{}, reward:{}".format(action, agent.state, reward))
            agent.observe(state)
            agent.get_reward(reward)
            episode_reward.append(reward)
        rewards.append(np.mean(episode_reward))
        state = grid_env.reset()
        is_goal = False
        agent.observe(state)

    plt.plot(np.arange(nb_episode), rewards)
    plt.show()
    # エピソードごとの平均報酬をプロット(ゴールに辿り着くのが早ければ早いほど高くなる)
    plt.savefig("result.png")