ノート

強化学習関連のこと

MENU

Win or Learn Fast PHC をじゃんけんゲームで実験

同じく、前回の記事でも用いた論文で、

紹介されているWin or Learn Fast PHC(WoLF-PHC)を実装して実験してみました。

http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf

Win or Learn Fast PHC

前回の記事参照

www.tcom242242.net

問題設定

今回はじゃんけんゲームで実験を行います。

じゃんけんなので、以下のような利得表になります。

1, 2 パー チョキ グー
パー 0, 0 -1,1 1,-1
チョキ -1,1 0, 0 1,-1
グー -1,1 1,-1 0,0

この問題においてのナッシュ均衡戦略はお互いに各選択肢を等確率で選択する時です。

つまり、グー、チョキ、パーを各々約33%で選択するような選択する時です。

実験結果

単純に何回か学習行動を行い、

ナッシュ均衡戦略に収束するかを見てみます。

まず、2人のエージェントがパーを出す確率の変化を見てみます。 f:id:ttt242242:20180717072644p:plain

x軸がエージェント1、y軸がエージェント2のパーを出す確率です。

ぐちゃぐちゃしていますが、

2体のエージェント共に、0.33ぐらいになっているのがわかります。

現在は学習率を減少させていないため、ぶれていますが、

少しずつ学習率を減少させることで、0.33に収束するはずです。

さらに平均報酬も見てみます。

agent1s average reward:0.00541
agent2s average reward:-0.00541

お互いに0ぐらいの報酬になっていることがわかります。

うまく均衡戦略を得ることができていると思います。

ソースコード

以下の3つのプログラムから構成されています - 実行用のプログラム(run_wolf.py)、 - gameプログラム(games/simple_game.py)、 - phcエージェント(agents/wolf_agent.py) - 方策(agents/policy.py)

ファイル構成

.
├── agents
│   ├── wolf_agent.py
│   └── policy.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_wolf.py

run_wolf.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import ipdb
from agents.wolf_agent import WoLFAgent
from agents.policy import NormalPolicy
from games.simple_game import SimpleGame

if __name__ == '__main__':

    nb_agents = 2 
    agents = []
    for idx in range(nb_agents):
        policy = NormalPolicy()
        agent = WoLFAgent(alpha=0.1, policy=policy, action_list=np.arange(3))  # agentの設定
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))
    plt.plot(agents[0].pi_history, agents[1].pi_history)
    plt.ylabel("Agent1's Probability of selecting Paper")
    plt.xlabel("Agent2's Probability of selecting Paper")
    plt.xlim(0, 1)
    plt.ylim(0, 1)
    plt.show()

games/game.py

from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass

games/simple_game.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            # Rock-Paper-Scissors
                            [[0, 0], [-1, 1], [1, -1]], 
                            [[1, -1], [0, 0], [-1, 1]], 
                            [[-1, 1], [1, -1], [0, 0]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass

agents/wolf_agent.py

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.rewards = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class WoLFAgent(Agent):
    """
        http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf
    """
    def __init__(self, delta=0.0001, action_list=None, **kwargs):
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化
        self.conter = 0
        self.pi = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_a = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_history = [self.pi[0]]
        self.high_delta = 0.0004
        self.row_delta = 0.0002

    def _init_q_values(self):
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.pi)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新
        self._update_pi_a()
        self._update_pi()

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

    def _update_pi_a(self):
       self.conter += 1
       for aidx, _ in enumerate(self.pi):
           self.pi_a[aidx] = self.pi_a[aidx] + (1/self.conter)*(self.pi[aidx]-self.pi_a[aidx])
           if self.pi_a[aidx] > 1: self.pi_a[aidx] = 1
           if self.pi_a[aidx] < 0: self.pi_a[aidx] = 0

    def _update_pi(self):
       delta = self.decide_delta()
       max_action_id = np.argmax(self.q_values)
       for aidx, _ in enumerate(self.pi):
           if aidx == max_action_id:
               update_amount = delta
           else:
               update_amount = ((-delta)/(len(self.action_list)-1))
           self.pi[aidx] = self.pi[aidx] + update_amount
           if self.pi[aidx] > 1: self.pi[aidx] = 1
           if self.pi[aidx] < 0: self.pi[aidx] = 0
       self.pi_history.append(self.pi[0])

    def decide_delta(self):
        expected_value = 0
        expected_value_average = 0
        for aidx, _ in enumerate(self.pi):
            expected_value += self.pi[aidx]*self.q_values[aidx]
            expected_value_average += self.pi_a[aidx]*self.q_values[aidx]

        if expected_value > expected_value_average: # win
            return self.row_delta
        else:   # row
            return self.high_delta

agents/policy.py

import copy
import numpy as np
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class NormalPolicy(Policy):
    """
        与えられた確率分布に従って選択
    """
    def __init__(self):
        super(NormalPolicy, self).__init__()

    def select_action(self, pi):
        randm = np.random.rand()
        sum_p = 0.0
        for idx, p in enumerate(pi):
            sum_p += p
            if randm <= sum_p:
                action = idx
                break
        return action

参考文献

Rational and Convergent Learning in Stochastic Games