ノート

いろいろ勉強したことをまとめていきます

MENU

Win or Learn Fast PHC をじゃんけんゲームで実験

同じく、前回の記事でも用いた論文で、

紹介されているWin or Learn Fast PHC(WoLF-PHC)を実装して実験してみました。

http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf

Win or Learn Fast PHC

前回の記事参照

www.tcom242242.net

問題設定

今回はじゃんけんゲームで実験を行います。

じゃんけんなので、以下のような利得表になります。

1, 2 パー チョキ グー
パー 0, 0 -1,1 1,-1
チョキ -1,1 0, 0 1,-1
グー -1,1 1,-1 0,0

この問題においてのナッシュ均衡戦略はお互いに各選択肢を等確率で選択する時です。

つまり、グー、チョキ、パーを各々約33%で選択するような選択する時です。

実験結果

単純に何回か学習行動を行い、

ナッシュ均衡戦略に収束するかを見てみます。

まず、2人のエージェントがパーを出す確率の変化を見てみます。 f:id:ttt242242:20180717072644p:plain

x軸がエージェント1、y軸がエージェント2のパーを出す確率です。

ぐちゃぐちゃしていますが、

2体のエージェント共に、0.33ぐらいになっているのがわかります。

現在は学習率を減少させていないため、ぶれていますが、

少しずつ学習率をげんしょうさせることで、0.33に収束するはずです。

さらに平均報酬も見てみます。

agent1s average reward:0.00541
agent2s average reward:-0.00541

お互いに0ぐらいの報酬になっていることがわかります。

うまく均衡戦略を得ることができていると思います。

ソースコード

以下の3つのプログラムから構成されています - 実行用のプログラム(run_wolf.py)、 - gameプログラム(games/simple_game.py)、 - phcエージェント(agents/wolf_agent.py) - 方策(agents/policy.py)

ファイル構成

.
├── agents
│   ├── wolf_agent.py
│   └── policy.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_wolf.py

run_wolf.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import ipdb
from agents.wolf_agent import WoLFAgent
from agents.policy import NormalPolicy
from games.simple_game import SimpleGame

if __name__ == '__main__':

    nb_agents = 2 
    agents = []
    for idx in range(nb_agents):
        policy = NormalPolicy()
        agent = WoLFAgent(alpha=0.1, policy=policy, action_list=np.arange(3))  # agentの設定
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))
    plt.plot(agents[0].pi_history, agents[1].pi_history)
    plt.ylabel("Agent1's Probability of selecting Paper")
    plt.xlabel("Agent2's Probability of selecting Paper")
    plt.xlim(0, 1)
    plt.ylim(0, 1)
    plt.show()

games/game.py

from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass

games/simple_game.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            # Rock-Paper-Scissors
                            [[0, 0], [-1, 1], [1, -1]], 
                            [[1, -1], [0, 0], [-1, 1]], 
                            [[-1, 1], [1, -1], [0, 0]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass

agents/wolf_agent.py

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.rewards = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class WoLFAgent(Agent):
    """
        http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf
    """
    def __init__(self, delta=0.0001, action_list=None, **kwargs):
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化
        self.conter = 0
        self.pi = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_a = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_history = [self.pi[0]]
        self.high_delta = 0.0004
        self.row_delta = 0.0002

    def _init_q_values(self):
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.pi)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新
        self._update_pi_a()
        self._update_pi()

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

    def _update_pi_a(self):
       self.conter += 1
       for aidx, _ in enumerate(self.pi):
           self.pi_a[aidx] = self.pi_a[aidx] + (1/self.conter)*(self.pi[aidx]-self.pi_a[aidx])
           if self.pi_a[aidx] > 1: self.pi_a[aidx] = 1
           if self.pi_a[aidx] < 0: self.pi_a[aidx] = 0

    def _update_pi(self):
       delta = self.decide_delta()
       max_action_id = np.argmax(self.q_values)
       for aidx, _ in enumerate(self.pi):
           if aidx == max_action_id:
               update_amount = delta
           else:
               update_amount = ((-delta)/(len(self.action_list)-1))
           self.pi[aidx] = self.pi[aidx] + update_amount
           if self.pi[aidx] > 1: self.pi[aidx] = 1
           if self.pi[aidx] < 0: self.pi[aidx] = 0
       self.pi_history.append(self.pi[0])

    def decide_delta(self):
        expected_value = 0
        expected_value_average = 0
        for aidx, _ in enumerate(self.pi):
            expected_value += self.pi[aidx]*self.q_values[aidx]
            expected_value_average += self.pi_a[aidx]*self.q_values[aidx]

        if expected_value > expected_value_average: # win
            return self.row_delta
        else:   # row
            return self.high_delta

agents/policy.py

import copy
import numpy as np
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class NormalPolicy(Policy):
    """
        与えられた確率分布に従って選択
    """
    def __init__(self):
        super(NormalPolicy, self).__init__()

    def select_action(self, pi):
        randm = np.random.rand()
        sum_p = 0.0
        for idx, p in enumerate(pi):
            sum_p += p
            if randm <= sum_p:
                action = idx
                break
        return action

参考文献

Rational and Convergent Learning in Stochastic Games

Win or Learn Fast-PHC

Win or Learn Fast

Win or Learn Fast(WoLF)は マルチエージェント強化学習における重要な学習原理の1つです。

この手法は2人2行動ゲームにおいて、ナッシュ均衡に収束することが

証明されていることから、マルチエージェント強化学習では、重要な原理の1つになっています。

この原理自体は、

名前にある通り、勝つかもしくは早く学習 するといった原理(principle)?です。

もう少し具体的にいうと、勝っている時は、ゆっくり学習。負けている時は早く学習するということです。

ここで、言う勝つというのは、

証明が行われた2人2行動ゲームでは、ナッシュ均衡方策をとった時の期待値に、

現在の方策による期待報酬値が上回っていることをいいます。

不思議なことに、この方策を用いることで、

ナッシュ均衡戦略に収束することが証明されています。*1

WoLF-PHC

WoLF-PHCとは、前回紹介したPHCアルゴリズムにWoLFの原理を適用したものになります。*2

WoLF-PHCでは、PHCアルゴリズムの方策を更新する速度を

WoLFの原理を用いて、調整します。

この時に勝つをどのように定義するかがポイントです。

通常のマルチエージェント強化学習では、

各エージェントが独立で学習している場合には、

WoLF-IGAを証明したときのように、

各エージェントがナッシュ均衡方策を知る(or 計算する)ことは基本的には不可能なので、

別の方策を用いる必要があります。

WoLF-PHCでは、平均方策ナッシュ均衡方策の代わりに用いています。

平均方策 は現在の方策に追従していくように、更新していくのですが、

更新していく毎に、更新幅が小さくなっていきます。

つまり最終的には、収束させます。

そのため、環境が動的に変化する場合などには、

単純にこの平均方策ではいけません。

ちなみに、なぜ、この方策で良いのかは実は僕はちょっとわかっていません・・・

(これは平均なの?)

まぁアルゴリズム的には、非常にシンプルです。

アルゴリズム

f:id:ttt242242:20180716182026j:plain

実験

実装と実験に関しては、以前挙げた記事を見ていただければと思います。

www.tcom242242.net

参考文献

Multiagent learning using a variable learning rate

Rational and Convergent Learning in Stochastic Games

Win or Learn Fast PHC で実験

PHCエージェントでの実験

Win or Learn Fast PHC で実験

同じく、前回の記事でも用いた論文で、

紹介されているWin or Learn Fast PHC(WoLF-PHC)を実装して実験してみました。

http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf

Win or Learn Fast PHC

WoLF-PHCは方策を更新する際に工夫を加えたものになります。

平均方策による期待値より、現在の方策による期待値が大きければ、

方策を少し更新。

一方、平均方策による期待値より、現在の方策による期待値が小さければ、

方策を大幅に更新する。といったほうような方策を用います。

不思議なことにこのように更新するだけで、前回のPHCでは発散してしまった方策が

均衡戦略に収束します。

問題設定

前回と同様に

Matching Pennies を用いて実験します。

以下がMatching Penniesの利得表

1,2 Heads Tails
Heads 1, -1 -1,1
Tails -1,1 1,-1

実験結果

単純に何回か学習行動を行い、

方策が収束するかを見てみます。

f:id:ttt242242:20180711193657p:plain

前回と異なり、Headsを選択する確率が0.5で収束?していることがわかります。

さらに平均報酬も見てみます。

agent0s average reward:0.0007
agent1s average reward:-0.0007

お互いに同じぐらいの報酬になっていることがわかります。

うまく均衡戦略を得ることができていると思います。

ソースコード

以下の3つのプログラムから構成されています - 実行用のプログラム(run_wolf.py)、 - gameプログラム(games/simple_game.py)、 - phcエージェント(agents/wolf_agent.py) - 方策(agents/policy.py)

ファイル構成

.
├── agents
│   ├── wolf_agent.py
│   └── policy.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_wolf.py

run_wolf.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import ipdb
from agents.wolf_agent import WoLFAgent
from agents.policy import NormalPolicy
from games.simple_game import SimpleGame

if __name__ == '__main__':

    nb_agents = 2
    agents = []
    for idx in range(nb_agents):
        policy = NormalPolicy()
        agent = WoLFAgent(alpha=0.1, policy=policy, action_list=np.arange(2))  # agentの設定
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))
    plt.plot(np.arange(len(agents[0].pi_history)),agents[0].pi_history)
    plt.ylabel("Probability of selecting Heads")
    plt.xlabel("Step")
    plt.ylim(0, 1)
    plt.show()

games/game.py

from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass

games/simple_game.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            [[1, -1], [-1, 1]],
                            [[-1, 1], [1, -1]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass

agents/wolf_agent.py

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.rewards = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class WoLFAgent(Agent):
    """
        Policy hill-climbing algorithm(PHC)
        http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf
    """
    def __init__(self, delta=0.0001, action_list=None, **kwargs):
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化
        self.conter = 0
        self.pi = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_a = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.pi_history = [self.pi[0]]
        self.high_delta = 0.0004
        self.row_delta = 0.0002

    def _init_q_values(self):
        q_values = {}
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.pi)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新
        self._update_pi_a()
        self._update_pi()

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

    def _update_pi_a(self):
       self.conter += 1
       for aidx, _ in enumerate(self.pi):
           self.pi_a[aidx] = self.pi_a[aidx] + (1/self.conter)*(self.pi[aidx]-self.pi_a[aidx])
           if self.pi_a[aidx] > 1: self.pi_a[aidx] = 1
           if self.pi_a[aidx] < 0: self.pi_a[aidx] = 0

    def _update_pi(self):
       delta = self.decide_delta()
       max_action_id = np.argmax(self.q_values)
       for aidx, _ in enumerate(self.pi):
           if aidx == max_action_id:
               update_amount = delta
           else:
               update_amount = ((-delta)/(len(self.action_list)-1))
           self.pi[aidx] = self.pi[aidx] + update_amount
           if self.pi[aidx] > 1: self.pi[aidx] = 1
           if self.pi[aidx] < 0: self.pi[aidx] = 0
       self.pi_history.append(self.pi[0])

    def decide_delta(self):
        expected_value = 0
        expected_value_average = 0
        for aidx, _ in enumerate(self.pi):
            expected_value += self.pi[aidx]*self.q_values[aidx]
            expected_value_average += self.pi_a[aidx]*self.q_values[aidx]

        if expected_value > expected_value_average: # win
            return self.row_delta
        else:   # row
            return self.high_delta

agents/policy.py

import copy
import numpy as np
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class NormalPolicy(Policy):
    """
        与えられた確率分布に従って選択
    """
    def __init__(self):
        super(NormalPolicy, self).__init__()

    def select_action(self, pi):
        randm = np.random.rand()
        sum_p = 0.0
        for idx, p in enumerate(pi):
            sum_p += p
            if randm <= sum_p:
                action = idx
                break
        return action

Policy Hill Climbing

Q学習のQ値を用いて、方策を山登り的に更新していく手法です。

基本的には、以下の論文で紹介されているPolicy Hill Climbing を述べます。

http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf

アルゴリズムは以下のようになります。

f:id:ttt242242:20180716182256j:plain
PHC

通常のQ学習は方策off型ですが、

ここでは、Q学習に方策$\pi$を加えています。

そして、Q値によって方策を更新します。

最大のQ値を持つ行動をより多く行うように修正し、、

それ以外はあまり行わないように修正していきます。

Win or Learn Fastを実装するために作られたのかな?

ちなみに、PHCのプログラムと実験結果は前の記事で

www.tcom242242.net

Policy Hill Climbingエージェントで実験

ランダムエージェントでの実験

Policy Hill Climbing で実験

以下の論文で、紹介されているPolicy Hill Climbing(PHC)

http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf

を実装して実験してみました。

問題設定

よくゲーム理論で用いられているものです。

論文と同じように Matching Pennies を用いて実験。

以下がMatching Penniesの利得表

1,2 Heads Tails
Heads 1, -1 -1,1
Tails -1,1 1,-1

Policy Hill Climbing Agent

PHCはQ学習を拡張したものになります。

基本的にはQ学習に方策を付与したものになります。

アルゴリズムについては、また後日まとめようと思います。

実験結果

単純に何回か学習行動を行い、

方策が収束するかを見てみます。

f:id:ttt242242:20180709210057p:plain

これで良いのかわかりませんが、

発散しています。

まぁ単純なPHCだとナッシュ均衡戦略に収束しないから、

WoLFが提案されているので良いのだと思いますが。

ソースコード

以下の3つのプログラムから構成されています

  • 実行用のプログラム(run_random.py)、
  • gameプログラム(games/simple_game.py)、
  • phcエージェント(agents/phc_agent.py)
  • 方策(agents/policy.py)

ファイル構成

.
├── agents
│   ├── phc_agent.py
│   └── policy.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_phc.py

run_phc.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
import ipdb
from agents.phc_agent import PHCAgent
from agents.policy import NormalPolicy
from games.simple_game import SimpleGame

if __name__ == '__main__':

    nb_agents = 2
    agents = []
    for idx in range(nb_agents):
        policy = NormalPolicy()
        agent = PHCAgent(alpha=0.1, policy=policy, action_list=np.arange(2))  # agentの設定
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))
    plt.plot(np.arange(len(agents[0].pi_history)),agents[0].pi_history)
    plt.ylabel("Probability of selecting Heads")
    plt.xlabel("Step")
    plt.ylim(0, 1)
    plt.show()

games/game.py

from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass

games/simple_game.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            [[1, -1], [-1, 1]],
                            [[-1, 1], [1, -1]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass

agents/phc_agent.py

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.rewards = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class PHCAgent(Agent):
    """
        Policy hill-climbing algorithm(PHC)
        http://www.cs.cmu.edu/~mmv/papers/01ijcai-mike.pdf
    """
    def __init__(self, delta=0.0001, action_list=None, **kwargs):
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化
        self.pi = [(1.0/len(action_list)) for idx in range(len(action_list))]
        self.delta = delta
        self.pi_history = [self.pi[0]]

    def _init_q_values(self):
        q_values = {}
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.pi)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新
        self._update_pi()

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

    def _update_pi(self):
       max_action_id = np.argmax(self.q_values)
       for aidx, _ in enumerate(self.pi):
           if aidx == max_action_id:
               update_amount = self.delta
           else:
               update_amount = ((-self.delta)/(len(self.action_list)-1))
           self.pi[aidx] = self.pi[aidx] + update_amount
           if self.pi[aidx] > 1: self.pi[aidx] = 1
           if self.pi[aidx] < 0: self.pi[aidx] = 0
       self.pi_history.append(self.pi[0])

agents/policy.py

import copy
import numpy as np
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class NormalPolicy(Policy):
    """
        与えられた確率分布に従って選択
    """
    def __init__(self):
        super(NormalPolicy, self).__init__()

    def select_action(self, pi):
        randm = np.random.rand()
        sum_p = 0.0
        for idx, p in enumerate(pi):
            sum_p += p
            if randm <= sum_p:
                action = idx
                break
        return action

囚人のジレンマをランダムエージェントで実験

実験もくそもないけど、ちょっと実装してみます。

強化学習エージェントで実装する前にランダムエージェントで、

報酬関数が囚人のジレンマのゲームをやってみます。

問題設定

よくゲーム理論で用いられているものです。

1,2 協調(C) 裏切り(D)
協調(C) 6, 6 2,7
裏切り(D) 7,2 0,0

ランダムエージェント

ランダムで行動選択をするエージェントです

実験結果

単純に何回か行動選択して、

得られた報酬の平均をプロットしてみます。

python run_random.py

agent0s average reward:3.75736
agent1s average reward:3.75341

ソースコード

以下の3つのプログラムから構成されています

  • 実行用のプログラム(run_random.py)、
  • gameプログラム(games/simple_game.py)、
  • randomエージェント(agents/random_agent.py)

GitHub - Tcom242242/multi-agent-learning

ファイル構成

├── agents
│   └── random_agent.py
├── games
│   ├── game.py
│   └── simple_game.py
└── run_random.py

run_random.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
import time
from tqdm import tqdm
import numpy as np
from agents.random_agent import RandomAgent
from games.simple_game import SimpleGame

if __name__ == '__main__':
    nb_agents = 2
    agents = []

    for idx in range(nb_agents):
        agent = RandomAgent(action_list=np.arange(2))
        agents.append(agent)

    game = SimpleGame(nb_steps=100000, agents=agents)
    game.run()
    # print results
    for idx, agent in enumerate(agents):
        print("agent{}s average reward:{}".format(idx, np.mean(agent.rewards)))

games/game.py

from abc import ABCMeta, abstractmethod


class Game(metaclass=ABCMeta):

    @abstractmethod
    def run(self):
        pass

games/simple_game.py

import os, sys
sys.path.append(os.getcwd())
from games.game import Game
from tqdm import tqdm
import numpy as np

class SimpleGame(Game):
    """
        シンプルなmatrix gameを強化学習でやってみる
    """
    def __init__(self, nb_eps=1, nb_steps=10000,agents=None):
        self.agents = agents
        self.nb_steps = nb_steps
        self.nb_eps = nb_eps
        self.reward_matrix = self._create_reward_table()

    def _reset_agents(self):
        for agent in self.agents:
            from_s = agent.state
            to_s = agent.init_state()
            self.env.force_move(int(from_s), int(to_s))

    def run(self):
        all_log = []
        for eps in range(self.nb_eps):
            social_rewards = []
            for step in tqdm(range(self.nb_steps)):
                a0, a1 = self.agents[0].act(), self.agents[1].act()
                r0, r1 = self.reward_matrix[a0][a1]
                social_rewards.append(r0+r1)
                self.agents[0].get_reward(r0)
                self.agents[1].get_reward(r1)

        social_rewards = np.array(social_rewards)
        return {"social_rewards":social_rewards}


    def _create_reward_table(self):
        """
            囚人のジレンマやチキン・ゲームなど、各ゲームに合わせて報酬行列を定義
        """
        reward_matrix = [
                            [[6, 6], [2, 7]],
                            [[7, 2], [0, 0]]
                        ]

        return reward_matrix

    def game_log(self):
        pass

    def get_conf(self):
        pass

agents/random_agent.py

import numpy as np
import random

class RandomAgent():
    def __init__(self, action_list=None):
        self.action_list = action_list  # 選択肢
        self.rewards = []

    def act(self, q_values=None):
        action_id = random.randint(0, (len(self.action_list)-1))
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.rewards.append(reward)

【ゲーム理論】2人2行動ゲームの分類

代表的なゲームの分類

2 player 2 action gameをいくつか挙げてまとめておきます。

まず、以下の利得表を使って、囚人のジレンマ、チキン・ゲーム、ハトタカゲーム、コーディネーションゲームを書いておきます。

1,2 協調(C) 裏切り(D)
協調(C) R,R S,T
裏切り(D) T,S P,P

各プレーヤ(1,2)の利得で分類されます

囚人のジレンマ

$$ T>R>P>S \\ 2R > T+S $$

チキン・ゲーム

$$ P>S>R>T $$

ハトタカゲーム

$$ P>S>R>T $$

コーディネーション問題

1,2 a b
a $R_1,R_2$ 0,0
b 0,0 $P_1,P_2$

コーディネーション問題には二種類ある。 1つ目が、複数のナッシュ均衡がある時に、コーディネーションの失敗を 回避するような選択ができるかどうか。

2つ目が、パレート効率的な戦略(均衡とは限らない)を 実現できるようなどうすればよいか。