ノート

強化学習関連のこと

MENU

【強化学習】【入門】Q学習

Q学習の説明のために、 簡単な迷路の例を用いて、Q学習を説明したいと思います。

最後に例で用いた迷路を実装してQ学習で解かせてみました。

Q学習

Q学習とは、現在の行動価値を更新する際に遷移先の状態の 最大行動価値を用いるような学習手法です。

Q学習では 以下のような手順を繰り返し行い学習していきます。

  1. 現在の位置(状態 $s$)における行動価値関数を用いて進行方向(行動$a$)の選択(ε-greedyなど)
  2. 実際に行動と状態の遷移
  3. エージェントは行動した後の位置(遷移先の状態 $s'$) と 報酬 $r$ (ゴールであれば、1、それ以外は、0) を観測。
  4. 得られた報酬を元に以下の式を用いて前回の位置での進行方向の価値(行動価値)を更新(学習)

$$ Q(s,a) \leftarrow Q(s,a) + \alpha(r + \max_{a' \in A}Q(s',a')-Q(s,a)) $$ この式は、状態sにおいて行動aを行った時に得られる報酬の期待値を表しています。

rは報酬、$\max_{a' \in A}Q(s',a')$ は遷移先状態s'においての最大行動価値、

$\alpha$は学習率を表しています。

具体例(迷路問題)

迷路問題をQ学習エージェントに解かせてみようと思います。

Q学習エージェントの目的は最短経路を見つけることです。

もちろん人が上からこのマップを見れば、簡単にわかりますが、

エージェント自身は上から見えませんし、ゴールの座標もわかりません。

エージェントが観測できるのは、現在の自分の位置(状態)、その状態でもらえる報酬のみです。

エージェントは何回もスタート地点からゴールまで行ってみて、

少しづつ学習していきます。

図を用いて、どのように学習していくかを説明

f:id:ttt242242:20180820150948j:plain

f:id:ttt242242:20180824194705j:plain

f:id:ttt242242:20180820151003j:plain

f:id:ttt242242:20180820151007j:plain

f:id:ttt242242:20180820151013j:plain

f:id:ttt242242:20180820151018j:plain

f:id:ttt242242:20180824194803j:plain

ゴールにたどり着いたら、同じスタート地点にエージェントを戻し、

繰り返し迷路を解かせます。

実験結果

上の図と同じマップで実験。

使用したプログラムは下にあります。

<省略>
Episode97
action:0, state:(2, 0), reward:0
action:0, state:(1, 0), reward:0
action:3, state:(0, 0), reward:0
action:3, state:(0, 1), reward:100
Episode98
action:0, state:(2, 0), reward:0
action:0, state:(1, 0), reward:0
action:3, state:(0, 0), reward:0
action:3, state:(0, 1), reward:100
Episode99
action:0, state:(2, 0), reward:0
action:0, state:(1, 0), reward:0
action:3, state:(0, 0), reward:0
action:3, state:(0, 1), reward:100
エピソード毎の平均報酬の推移

最短で4ステップで着くので、平均報酬の最大値は100/4で25になります

f:id:ttt242242:20180820152157p:plain

プログラム

import random
import numpy as np
import matplotlib.pyplot as plt


FILED_TYPE = {
    "N": 0,
    "S": 1,
    "G": 2,
    "X": 3, }

ACTIONS = {
    "UP": 0,
    "DOWN": 1,
    "LEFT": 2,
    "RIGHT": 3}


class GridEnv:
    """
        環境
    """
    def __init__(self):
        self.map = [[0, 0, 2],
                [0, 3, 0],
                [1, 0, 0]]

        self.start_pos = 2, 0
        self.agent_pos = 2, 0

    def step(self, action):
        """
            return pos, reward
        """
        to_y, to_x = self.agent_pos

        if self._is_possible_action(to_x, to_y, action) == False:
            return self.agent_pos, -1, False

        if action == ACTIONS["UP"]:
            to_y += -1
        elif action == ACTIONS["DOWN"]:
            to_y += 1
        elif action == ACTIONS["LEFT"]:
            to_x += -1
        elif action == ACTIONS["RIGHT"]:
            to_x += 1

        is_goal = self._is_goal(to_x, to_y)
        reward = self._compute_reward(to_x, to_y)
        self.agent_pos = to_y, to_x
        return self.agent_pos, reward, is_goal

    def _is_goal(self, x, y):
        if self.map[y][x] == FILED_TYPE["G"]:
            return True
        else:
            return False

    def _is_possible_action(self, x, y, action):
        """
            実行可能な行動かどうかの判定
        """
        to_x = x
        to_y = y

        if action == ACTIONS["UP"]:
            to_y += -1
        elif action == ACTIONS["DOWN"]:
            to_y += 1
        elif action == ACTIONS["LEFT"]:
            to_x += -1
        elif action == ACTIONS["RIGHT"]:
            to_x += 1
        else:
            raize("Action Error")

        if len(self.map) <= to_y or 0 > to_y:
            return False
        elif len(self.map[0]) <= to_x or 0 > to_x:
            return False

        return True

    def _compute_reward(self, x, y):
        if self.map[y][x] == FILED_TYPE["N"] or self.map[y][x] == FILED_TYPE["S"]:
            return 0
        elif self.map[y][x] == FILED_TYPE["X"]:
            return -100
        elif self.map[y][x] == FILED_TYPE["G"]:
            return 100

    def reset(self):
        self.agent_pos = self.start_pos
        return self.start_pos


MinEps = 0.001  # 最小のε
class EpsGreedyQPolicy:
    """
        ε-greedy選択
    """
    def __init__(self, epsilon=.1, decay_rate=1):
        self.epsilon = epsilon
        self.decay_rate = decay_rate
        self.name = "EPS"

    def select_action(self, q_values):
        assert q_values.ndim == 1
        nb_actions = q_values.shape[0]

        if np.random.uniform() < self.epsilon:  # random行動
            action = np.random.random_integers(0, nb_actions-1)
        else:   # greedy 行動
            action = np.argmax(q_values)

        self.decay_epsilon()
        return action

    def decay_epsilon(self):    # 探索率を減少
        self.epsilon = self.epsilon*self.decay_rate
        if self.epsilon <= MinEps:
            self.epsilon = MinEps


INIQ = 0.0    # 初期のQ値
MinAlpha = 0.01 # 最小のα
class QLearningAgent:
    """
        シンプルなq学習エージェント
    """
    def __init__(self, alpha=0.2, policy=None, gamma=0.99, actions=None, observation=None, alpha_decay_rate=None, epsilon_decay_rate=None):
        self.alpha = alpha
        self.policy = policy
        self.reward_history = []
        self.actions = actions
        self.gamma = gamma
        self.alpha_decay_rate = alpha_decay_rate
        self.epsilon_decay_rate = epsilon_decay_rate
        self.state = str(observation)
        self.ini_state = str(observation)
        self.last_state = str(observation)
        self.last_action_id = None
        self.q_values = self._init_q_values()

    def _init_q_values(self):
        q_values = {}
        q_values[self.state] = np.repeat(INIQ, len(self.actions))
        return q_values

    def init_state(self):
        self.last_state = copy.deepcopy(self.ini_state)
        self.state = copy.deepcopy(self.ini_state)
        return self.state

    def act(self):
        action_id = self.policy.select_action(self.q_values[self.state])
        self.last_action_id = action_id
        action = self.actions[action_id]
        return action

    def get_reward(self, reward):
        """
            報酬の取得
        """
        self.reward_history.append(reward)
        self.q_values[self.last_state][self.last_action_id], pre_q, max_q2 = self._update_q_value(reward)
        self.decay_alpha()

    def observe(self, next_state):
        """
            遷移先の状態の観測
        """
        next_state = str(next_state)
        if next_state not in self.q_values:
            self.q_values[next_state] = np.repeat(INIQ, len(self.actions))

        self.last_state = self.state
        self.state = next_state

    def _update_q_value(self, reward):
        pre_q = self.q_values[self.last_state][self.last_action_id]
        max_q2 = max(self.q_values[self.state])
        updated_q = ((1.0 - self.alpha) * pre_q) + (self.alpha * (reward + (self.gamma * max_q2)))

        return updated_q, pre_q, max_q2

    def decay_alpha(self):
        if self.alpha_decay_rate is not None:
            if self.alpha >= MinAlpha:
                self.alpha *= self.alpha_decay_rate


if __name__ == '__main__':
    grid_env = GridEnv()
    ini_state = grid_env.start_pos  # 初期状態(エージェントのスタート地点の位置)
    is_goal = False # エージェントがゴールしてるかどうか?
    policy = EpsGreedyQPolicy(epsilon=1.0, decay_rate=0.99)
    agent = QLearningAgent(actions=np.arange(4), observation=ini_state, policy=policy)
    nb_episode = 100   # エピソード数
    rewards = []
    for episode in range(nb_episode):
        episode_reward = []
        print("Episode{}".format(episode))
        while not is_goal:
            action = agent.act()
            state, reward, is_goal = grid_env.step(action)
            print("action:{}, state:{}, reward:{}".format(action, agent.state, reward))
            agent.observe(state)
            agent.get_reward(reward)
            episode_reward.append(reward)
        rewards.append(np.mean(episode_reward))
        state = grid_env.reset()
        is_goal = False
        agent.observe(state)

    plt.plot(np.arange(nb_episode), rewards)
    plt.show()
    # エピソードごとの平均報酬をプロット(ゴールに辿り着くのが早ければ早いほど高くなる)
    plt.savefig("result.png")

【強化学習】UCBアルゴリズムを使って、マルチアームバンディットプログラムを解く

やったこと

UCBアルゴリズムを実装してマルチアームバンディットプログラムで試してみました。

UCBとは、

UCBアルゴリズムは、行動選択手法の一つになります。

UCBアルゴリズム知らないものに楽観的にを ポリシーにした行動選択戦略です。

UCBではUCB値という値を用いて、 行動選択を行います。

$$ UCB_i = Q_{t} + C \sqrt{\frac{\ln n}{2 n_i}} $$

$Q_i$ は選択肢iの平均報酬。nは全試行回数、$n_i$は アームiの全試行回数。Cは定数。

アルゴリズム的には以下のようになります。

  1. UCB値を初期化
  2. 最も高いUCB値を持つ選択肢$i$を選択
  3. 選択によって報酬$r$を得る
  4. 選択肢$i$ の期待値$Q_i$、総プレス数n、選択肢$n_i$ を1ずつ増やす
  5. UCB値を更新
  6. 2,3,4,5を繰り返す

問題設定

またまたマルチアームバンディットプログラムを用います。

今回は、真ん中の腕が一番期待値は高いとします。

結果

バンディット問題を用いて評価しました。

ある程度高い報酬を得ることができていると確認できます。

f:id:ttt242242:20180812054514p:plain
ステップ毎のUCBエージェントの利得の推移

プログラム

├── agents
│   ├── policy.py
│   └── simple_rl_agent.py
├── envs
│   └── multi_arm_bandit.py
└── run-multi-arm-bandit-ucb.py

実験用プログラム(run-multi-arm-bandit-ucb.py)

import os,sys
import random
import numpy as np
import matplotlib.pyplot as plt
from agents.simple_rl_agent import SimpleRLAgent
from agents.policy import UCB
from envs.multi_arm_bandit import MultiArmBandit

if __name__ == '__main__':
    conf_arm = [{"id":0, "mu":0.1, "sd":0.1}, # 平均 0.1、分散0.1の正規分布に従う乱数によって報酬を設定
                {"id":1, "mu":0.5, "sd":0.1},
                {"id":2, "mu":2[f:id:ttt242242:20180812054514p:plain], "sd":0.1},
                {"id":3, "mu":0.2, "sd":0.1},
                {"id":4, "mu":0.4, "sd":0.1},
                ]

    game = MultiArmBandit(conf_arm=conf_arm) # 5本のアームを設定
    policy = UCB(c=0.2, actions=np.arange(len(conf_arm)))    # UCBアルゴリズム
    agent = SimpleRLAgent(alpha=0.1, policy=policy, action_list=np.arange(len(conf_arm)))  # agentの設定
    nb_step = 10000   # ステップ数
    rewards = []
    for step in range(nb_step):
        action = agent.act()    # レバーの選択
        reward = game.step(action) # レバーを引く
        rewards.append(reward)
        agent.get_reward(reward) # エージェントは報酬を受け取り学習

    plt.plot(np.arange(nb_step), rewards)
    plt.ylabel("reward")
    plt.xlabel("steps")
    plt.savefig("fig/result_ucb.png")
    plt.show()

マルチアームバンディットプログラム(multi_arm_bandit.py)

import random
import numpy as np
import matplotlib.pyplot as plt

class Arm():
    def __init__(self, idx):
        self.value = random.random()    # ランダムでこのアームを引いた時の報酬を設定

    def pull(self):
        return self.value

class MultiArmBandit():
    def __init__(self, nb_arm):
        self.arms = self._init_arms(nb_arm)

    def _init_arms(self, nb_arm):
        arms = []
        for i in range(nb_arm):
            arms.append(Arm(i))

        return arms

    def step(self, arm_id):
        """
            pull lever
        """
        return self.arms[arm_id].pull()

if __name__ == '__main__':
    game = MultiArmBandit(nb_arm=5) # 5本のアームを設定
    nb_step = 100   #ステップ数
    rewards = []
    for step in range(nb_step):
        reward = game.step(random.randint(0, 4))
        rewards.append(reward)

    plt.plot(np.arange(nb_step), rewards)
    plt.ylim(0, 1)
    plt.savefig("result1.png")

エージェントプログラム(simple_rl_agent.py)

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.reward_history = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class SimpleRLAgent(Agent):
    """
        シンプルな強化学習エージェント
    """
    def __init__(self, action_list=None, **kwargs):
        """
        :param action_list:
        :param q_values:
        :param kwargs:
        """
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化

    def _init_q_values(self):
        q_values = {}
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.q_values)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.reward_history.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

行動選択方策プログラム(policy.py)

import copy
import numpy as np
import math
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class UCB(Policy):
    def __init__(self, c, actions):
        super(UCB, self).__init__()
        self.average_rewards = np.repeat(0.0, len(actions)) # 各腕の平均報酬
        self.ucbs = np.repeat(10.0, len(actions))   # UCB値
        self.counters = np.repeat(0, len(actions))  # 各腕の試行回数
        self.c = c
        self.all_conter = 0 # 全試行回数
        self.name = "UCB"

    def select_action(self):
        action_id = np.argmax(self.ucbs)
        self.counters[action_id] += 1
        self.all_conter += 1
        return action_id

    def update_usbs(self, action_id, reward):
        self.ucbs[action_id] = self.average_rewards[action_id] + self.c * math.log(self.all_conter)/np.sqrt(self.counters[action_id])

    def update_average_rewards(self, action_id, reward):
        self.average_rewards[action_id] = (self.counters[action_id]*self.average_rewards[action_id]+reward)/(self.counters[action_id]+1)

参考

【ディープラーニング】【TensorFlow】TensorFlowで多クラス分類

tensorflowで多クラス分類

背景

前回、Kerasを使ってirisデータを分類しました。

www.tcom242242.net

今回はそれをTensorFlowでやってみました。

やりたいこと

前回と同じですが

irisデータセットをsklearnから取得して分類する。

irisデータセット

sklearnのirisデータセットには3種類の花の萼の長さと幅、花弁の長さと幅のデータがある

iris = datasets.load_iris()
x = iris.data
y = iris.target 

print(x[0]) # => array([ 5.1,  3.5,  1.4,  0.2]) 0番目の花の情報
print(y[0]) # => 0 # 0番目の花は0クラス


コード

import tensorflow as tf
from sklearn import datasets
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from keras.utils import np_utils

# データの用意
iris = datasets.load_iris() # データを取得
x = iris.data   # 花の特徴量、長さなど
y = iris.target # 0, 1, 2のラベル
x = preprocessing.scale(x)  # 標準化

y = np_utils.to_categorical(y)  # one-hotエンコード.例) 1 => [0, 1, 0]
x_train, x_test, y_train, y_test= train_test_split(x, y, train_size=0.8)    # 教師データとテストデータに分割

# モデルの作成
inputs = tf.placeholder(dtype=tf.float32, shape=[None,]+[len(x_train[0])], name="input")    # 入力
layer = tf.layers.dense(inputs, 16, activation=tf.nn.relu)  # 中間層
layer = tf.layers.dense(layer, 16, activation=tf.nn.relu)   # 中間層

outputs = tf.layers.dense(x , len(y_train[0]))  # 出力

teacher = tf.placeholder(dtype=tf.float32, shape=(None, 3)) # 正解データ用

softmax = tf.losses.softmax_cross_entropy(onehot_labels=y, logits=outputs)  # クロスエントロピー
loss = tf.reduce_mean(softmax)
optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)  # 最適化アルゴリズムの設定
train = optimizer.minimize(loss)

sess = tf.Session()
sess.run(tf.initialize_all_variables())

# 学習
nb_epoch = 1000
for epoch in range(nb_epoch):
    sess.run(train, feed_dict={inputs:x_train,  teacher:y_train})

print("==============TEST====================")
for x, y in zip(x_test, y_test):
    answer = np.argmax(y)
    prediction = np.argmax(sess.run(outputs, feed_dict={inputs:[x]}))
    print("正解:{}, 予測値:{}".format(answer,prediction))

【ゲーム理論】【マルチエージェント学習】Two Player Two Action ゲームの具体的な利得テーブルまとめ

Common interest game
1,2 a b
a 1.0,1.0 0.0,0.0
b 0.0,0.0 0.5,0.5
Coordination game
1,2 a b
a 1.0,0.5 0.0,0.0
b 0.0,0.0 0.5,1.0
Stag hunt game
1,2 a b
a 1.0,1.0 0.0,0.75
b 0.75,0.0 0.5,0.5
Tricky game
1,2 a b
a 0.0,1.0 1.0,0.67
b 0.33,0.0 0.67,0.33
Prisoners' dilemma
1,2 a b
a 0.6,0.6 1.0,0.67
b 0.33,0.0 0.67,0.33
Battle of the sexes
1,2 a b
a 0.0,0.0 0.67,1.0
b 1.0,0.67 0.33,0.33
Chicken game
1,2 a b
a 0.84,0.84 0.33,1.0
b 1.0,0.33 0.0,0.0

参考文献

https://link.springer.com/article/10.1007/s10994-010-5192-9

【強化学習】UCBアルゴリズム

UCBアルゴリズム

UCBアルゴリズム知らないものに楽観的にを ポリシーにした行動選択戦略です。

UCBではUCB値という値を用いて、 行動選択を行います。

UCB値は選択肢毎に設定され、

以下の式を用いて、更新していきます。

\begin{align} UCB_i = Q_{t} + C \sqrt{\frac{\ln n}{2 n_i}} \end{align}

$Q_i$ は選択肢iの平均報酬。nは全試行回数、$n_i$は アームiの全試行回数。Cは定数。

上記の式は、右の項によって、あまり選択していなければ、 その価値が高くなるように調整されます。

逆に、多く選択されれば、分母が大きくなるので0に近づきます。

つまり、純粋な期待値でだけで評価するようになります。

アルゴリズム的には以下のようになります。

  1. UCB値を初期化
  2. 最も高いUCB値を持つ選択肢$i$を選択
  3. 選択によって報酬$r$を得る
  4. 選択肢$i$ の期待値$Q_i$、総プレス数n、選択肢$n_i$ を1ずつ増やす
  5. UCB値を更新
  6. 2,3,4,5を繰り返す

参考文献

https://www.jstage.jst.go.jp/article/fss/30/0/30_174/_pdf

Win or Learn Fast PHC をじゃんけんゲームで実験

同じく、前回の記事でも用いた論文で、

紹介されているWin or Learn Fast PHC(WoLF-PHC)を実装して実験してみました。

http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf

Win or Learn Fast PHC

前回の記事参照

www.tcom242242.net

問題設定

今回はじゃんけんゲームで実験を行います。

じゃんけんなので、以下のような利得表になります。

1, 2 パー チョキ グー
パー 0, 0 -1,1 1,-1
チョキ -1,1 0, 0 1,-1
グー -1,1 1,-1 0,0

この問題においてのナッシュ均衡戦略はお互いに各選択肢を等確率で選択する時です。

つまり、グー、チョキ、パーを各々約33%で選択するような選択する時です。

実験結果

単純に何回か学習行動を行い、

ナッシュ均衡戦略に収束するかを見てみます。

まず、2人のエージェントがパーを出す確率の変化を見てみます。 f:id:ttt242242:20180717072644p:plain

x軸がエージェント1、y軸がエージェント2のパーを出す確率です。

ぐちゃぐちゃしていますが、

2体のエージェント共に、0.33ぐらいになっているのがわかります。

現在は学習率を減少させていないため、ぶれていますが、

少しずつ学習率を減少させることで、0.33に収束するはずです。

さらに平均報酬も見てみます。

agent1s average reward:0.00541
agent2s average reward:-0.00541

お互いに0ぐらいの報酬になっていることがわかります。

うまく均衡戦略を得ることができていると思います。

ソースコード

以下の3つのプログラムから構成されています - 実行用のプログラム(run_wolf.py)、 - gameプログラム(games/simple_game.py)、 - phcエージェント(agents/wolf_agent.py) - 方策(agents/policy.py)

ファイル構成

.
├── agents
│   ├── wolf_agent.py
│   └── policy.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_wolf.py

run_wolf.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import ipdb
from agents.wolf_agent import WoLFAgent
from agents.policy import NormalPolicy
from games.simple_game import SimpleGame

if __name__ == '__main__':

    nb_agents = 2 
    agents = []
    for idx in range(nb_agents):
        policy = NormalPolicy()
        agent = WoLFAgent(alpha=0.1, policy=policy, action_list=np.arange(3))  # agentの設定
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))
    plt.plot(agents[0].pi_history, agents[1].pi_history)
    plt.ylabel("Agent1's Probability of selecting Paper")
    plt.xlabel("Agent2's Probability of selecting Paper")
    plt.xlim(0, 1)
    plt.ylim(0, 1)
    plt.show()

games/game.py

from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass

games/simple_game.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            # Rock-Paper-Scissors
                            [[0, 0], [-1, 1], [1, -1]], 
                            [[1, -1], [0, 0], [-1, 1]], 
                            [[-1, 1], [1, -1], [0, 0]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass

agents/wolf_agent.py

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.rewards = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class WoLFAgent(Agent):
    """
        http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf
    """
    def __init__(self, delta=0.0001, action_list=None, **kwargs):
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化
        self.conter = 0
        self.pi = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_a = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_history = [self.pi[0]]
        self.high_delta = 0.0004
        self.row_delta = 0.0002

    def _init_q_values(self):
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.pi)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新
        self._update_pi_a()
        self._update_pi()

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

    def _update_pi_a(self):
       self.conter += 1
       for aidx, _ in enumerate(self.pi):
           self.pi_a[aidx] = self.pi_a[aidx] + (1/self.conter)*(self.pi[aidx]-self.pi_a[aidx])
           if self.pi_a[aidx] > 1: self.pi_a[aidx] = 1
           if self.pi_a[aidx] < 0: self.pi_a[aidx] = 0

    def _update_pi(self):
       delta = self.decide_delta()
       max_action_id = np.argmax(self.q_values)
       for aidx, _ in enumerate(self.pi):
           if aidx == max_action_id:
               update_amount = delta
           else:
               update_amount = ((-delta)/(len(self.action_list)-1))
           self.pi[aidx] = self.pi[aidx] + update_amount
           if self.pi[aidx] > 1: self.pi[aidx] = 1
           if self.pi[aidx] < 0: self.pi[aidx] = 0
       self.pi_history.append(self.pi[0])

    def decide_delta(self):
        expected_value = 0
        expected_value_average = 0
        for aidx, _ in enumerate(self.pi):
            expected_value += self.pi[aidx]*self.q_values[aidx]
            expected_value_average += self.pi_a[aidx]*self.q_values[aidx]

        if expected_value > expected_value_average: # win
            return self.row_delta
        else:   # row
            return self.high_delta

agents/policy.py

import copy
import numpy as np
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class NormalPolicy(Policy):
    """
        与えられた確率分布に従って選択
    """
    def __init__(self):
        super(NormalPolicy, self).__init__()

    def select_action(self, pi):
        randm = np.random.rand()
        sum_p = 0.0
        for idx, p in enumerate(pi):
            sum_p += p
            if randm <= sum_p:
                action = idx
                break
        return action

参考文献

Rational and Convergent Learning in Stochastic Games

Win or Learn Fast-PHC

Win or Learn Fast

Win or Learn Fast(WoLF)は マルチエージェント強化学習における重要な学習原理の1つです。

この手法は2人2行動ゲームにおいて、ナッシュ均衡に収束することが

証明されていることから、マルチエージェント強化学習では、重要な原理の1つになっています。

この原理自体は、

名前にある通り、勝つかもしくは早く学習 するといった原理(principle)?です。

もう少し具体的にいうと、勝っている時は、ゆっくり学習。負けている時は早く学習するということです。

ここで、言う勝つというのは、

証明が行われた2人2行動ゲームでは、ナッシュ均衡方策をとった時の期待値に、

現在の方策による期待報酬値が上回っていることをいいます。

不思議なことに、この方策を用いることで、

ナッシュ均衡戦略に収束することが証明されています。*1

WoLF-PHC

WoLF-PHCとは、前回紹介したPHCアルゴリズムにWoLFの原理を適用したものになります。*2

WoLF-PHCでは、PHCアルゴリズムの方策を更新する速度を

WoLFの原理を用いて、調整します。

この時に勝つをどのように定義するかがポイントです。

通常のマルチエージェント強化学習では、

各エージェントが独立で学習している場合には、

WoLF-IGAを証明したときのように、

各エージェントがナッシュ均衡方策を知る(or 計算する)ことは基本的には不可能なので、

別の方策を用いる必要があります。

WoLF-PHCでは、平均方策ナッシュ均衡方策の代わりに用いています。

平均方策 は現在の方策に追従していくように、更新していくのですが、

更新していく毎に、更新幅が小さくなっていきます。

つまり最終的には、収束させます。

そのため、環境が動的に変化する場合などには、

単純にこの平均方策ではいけません。

ちなみに、なぜ、この方策で良いのかは実は僕はちょっとわかっていません・・・

(これは平均なの?)

まぁアルゴリズム的には、非常にシンプルです。

アルゴリズム

f:id:ttt242242:20180716182026j:plain

実験

問題設定

前回と同様に

Matching Pennies を用いて実験します。

以下がMatching Penniesの利得表

1,2 Heads Tails
Heads 1, -1 -1,1
Tails -1,1 1,-1
実験結果

単純に何回か学習行動を行い、

方策が収束するかを見てみます。

f:id:ttt242242:20180711193657p:plain

前回と異なり、Headsを選択する確率が0.5で収束?していることがわかります。

さらに平均報酬も見てみます。

agent0s average reward:0.0007
agent1s average reward:-0.0007

お互いに同じぐらいの報酬になっていることがわかります。

うまく均衡戦略を得ることができていると思います。

ソースコード

以下の3つのプログラムから構成されています - 実行用のプログラム(run_wolf.py)、 - gameプログラム(games/simple_game.py)、 - phcエージェント(agents/wolf_agent.py) - 方策(agents/policy.py)

ファイル構成
.
├── agents
│   ├── wolf_agent.py
│   └── policy.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_wolf.py
run_wolf.py
import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import ipdb
from agents.wolf_agent import WoLFAgent
from agents.policy import NormalPolicy
from games.simple_game import SimpleGame

if __name__ == '__main__':

    nb_agents = 2
    agents = []
    for idx in range(nb_agents):
        policy = NormalPolicy()
        agent = WoLFAgent(alpha=0.1, policy=policy, action_list=np.arange(2))  # agentの設定
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))
    plt.plot(np.arange(len(agents[0].pi_history)),agents[0].pi_history)
    plt.ylabel("Probability of selecting Heads")
    plt.xlabel("Step")
    plt.ylim(0, 1)
    plt.show()
games/game.py
from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass
games/simple_game.py
import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            [[1, -1], [-1, 1]],
                            [[-1, 1], [1, -1]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass
agents/wolf_agent.py
from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.rewards = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class WoLFAgent(Agent):
    """
        Policy hill-climbing algorithm(PHC)
        http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf
    """
    def __init__(self, delta=0.0001, action_list=None, **kwargs):
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化
        self.conter = 0
        self.pi = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_a = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_history = [self.pi[0]]
        self.high_delta = 0.0004
        self.row_delta = 0.0002

    def _init_q_values(self):
        q_values = {}
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.pi)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新
        self._update_pi_a()
        self._update_pi()

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

    def _update_pi_a(self):
       self.conter += 1
       for aidx, _ in enumerate(self.pi):
           self.pi_a[aidx] = self.pi_a[aidx] + (1/self.conter)*(self.pi[aidx]-self.pi_a[aidx])
           if self.pi_a[aidx] > 1: self.pi_a[aidx] = 1
           if self.pi_a[aidx] < 0: self.pi_a[aidx] = 0

    def _update_pi(self):
       delta = self.decide_delta()
       max_action_id = np.argmax(self.q_values)
       for aidx, _ in enumerate(self.pi):
           if aidx == max_action_id:
               update_amount = delta
           else:
               update_amount = ((-delta)/(len(self.action_list)-1))
           self.pi[aidx] = self.pi[aidx] + update_amount
           if self.pi[aidx] > 1: self.pi[aidx] = 1
           if self.pi[aidx] < 0: self.pi[aidx] = 0
       self.pi_history.append(self.pi[0])

    def decide_delta(self):
        expected_value = 0
        expected_value_average = 0
        for aidx, _ in enumerate(self.pi):
            expected_value += self.pi[aidx]*self.q_values[aidx]
            expected_value_average += self.pi_a[aidx]*self.q_values[aidx]

        if expected_value > expected_value_average: # win
            return self.row_delta
        else:   # row
            return self.high_delta
agents/policy.py
import copy
import numpy as np
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class NormalPolicy(Policy):
    """
        与えられた確率分布に従って選択
    """
    def __init__(self):
        super(NormalPolicy, self).__init__()

    def select_action(self, pi):
        randm = np.random.rand()
        sum_p = 0.0
        for idx, p in enumerate(pi):
            sum_p += p
            if randm <= sum_p:
                action = idx
                break
        return action

参考文献

Multiagent learning using a variable learning rate

Rational and Convergent Learning in Stochastic Games