ノート

強化学習関連のこと

MENU

【強化学習】UCBアルゴリズムを使って、マルチアームバンディットプログラムを解く

やったこと

UCBアルゴリズムを実装してマルチアームバンディットプログラムで試してみました。

UCBとは、

UCBアルゴリズムは、行動選択手法の一つになります。

UCBアルゴリズム知らないものに楽観的にを ポリシーにした行動選択戦略です。

UCBではUCB値という値を用いて、 行動選択を行います。

$$ UCB_i = Q_{t} + C \sqrt{\frac{\ln n}{2 n_i}} $$

$Q_i$ は選択肢iの平均報酬。nは全試行回数、$n_i$は アームiの全試行回数。Cは定数。

アルゴリズム的には以下のようになります。

  1. UCB値を初期化
  2. 最も高いUCB値を持つ選択肢$i$を選択
  3. 選択によって報酬$r$を得る
  4. 選択肢$i$ の期待値$Q_i$、総プレス数n、選択肢$n_i$ を1ずつ増やす
  5. UCB値を更新
  6. 2,3,4,5を繰り返す

問題設定

またまたマルチアームバンディットプログラムを用います。

今回は、真ん中の腕が一番期待値は高いとします。

結果

バンディット問題を用いて評価しました。

ある程度高い報酬を得ることができていると確認できます。

f:id:ttt242242:20180812054514p:plain
ステップ毎のUCBエージェントの利得の推移

プログラム

├── agents
│   ├── policy.py
│   └── simple_rl_agent.py
├── envs
│   └── multi_arm_bandit.py
└── run-multi-arm-bandit-ucb.py

実験用プログラム(run-multi-arm-bandit-ucb.py)

import os,sys
import random
import numpy as np
import matplotlib.pyplot as plt
from agents.simple_rl_agent import SimpleRLAgent
from agents.policy import UCB
from envs.multi_arm_bandit import MultiArmBandit

if __name__ == '__main__':
    conf_arm = [{"id":0, "mu":0.1, "sd":0.1}, # 平均 0.1、分散0.1の正規分布に従う乱数によって報酬を設定
                {"id":1, "mu":0.5, "sd":0.1},
                {"id":2, "mu":2[f:id:ttt242242:20180812054514p:plain], "sd":0.1},
                {"id":3, "mu":0.2, "sd":0.1},
                {"id":4, "mu":0.4, "sd":0.1},
                ]

    game = MultiArmBandit(conf_arm=conf_arm) # 5本のアームを設定
    policy = UCB(c=0.2, actions=np.arange(len(conf_arm)))    # UCBアルゴリズム
    agent = SimpleRLAgent(alpha=0.1, policy=policy, action_list=np.arange(len(conf_arm)))  # agentの設定
    nb_step = 10000   # ステップ数
    rewards = []
    for step in range(nb_step):
        action = agent.act()    # レバーの選択
        reward = game.step(action) # レバーを引く
        rewards.append(reward)
        agent.get_reward(reward) # エージェントは報酬を受け取り学習

    plt.plot(np.arange(nb_step), rewards)
    plt.ylabel("reward")
    plt.xlabel("steps")
    plt.savefig("fig/result_ucb.png")
    plt.show()

マルチアームバンディットプログラム(multi_arm_bandit.py)

import random
import numpy as np
import matplotlib.pyplot as plt

class Arm():
    def __init__(self, idx):
        self.value = random.random()    # ランダムでこのアームを引いた時の報酬を設定

    def pull(self):
        return self.value

class MultiArmBandit():
    def __init__(self, nb_arm):
        self.arms = self._init_arms(nb_arm)

    def _init_arms(self, nb_arm):
        arms = []
        for i in range(nb_arm):
            arms.append(Arm(i))

        return arms

    def step(self, arm_id):
        """
            pull lever
        """
        return self.arms[arm_id].pull()

if __name__ == '__main__':
    game = MultiArmBandit(nb_arm=5) # 5本のアームを設定
    nb_step = 100   #ステップ数
    rewards = []
    for step in range(nb_step):
        reward = game.step(random.randint(0, 4))
        rewards.append(reward)

    plt.plot(np.arange(nb_step), rewards)
    plt.ylim(0, 1)
    plt.savefig("result1.png")

エージェントプログラム(simple_rl_agent.py)

from abc import ABCMeta, abstractmethod
import numpy as np
import ipdb

class Agent(metaclass=ABCMeta):
    """Abstract Agent Class"""

    def __init__(self, alpha=None, policy=None):
        """
        :param alpha:
        :param policy:
        """
        self.alpha = alpha
        self.policy = policy
        self.reward_history = []

    @abstractmethod
    def act(self):
        pass

    @abstractmethod
    def get_reward(self, reward):
        pass


class SimpleRLAgent(Agent):
    """
        シンプルな強化学習エージェント
    """
    def __init__(self, action_list=None, **kwargs):
        """
        :param action_list:
        :param q_values:
        :param kwargs:
        """
        super().__init__(**kwargs)
        self.action_list = action_list  # 選択肢
        self.last_action_id = None
        self.q_values = self._init_q_values()   # 期待報酬値の初期化

    def _init_q_values(self):
        q_values = {}
        q_values = np.repeat(0.0, len(self.action_list))
        return q_values

    def act(self, q_values=None):
        action_id = self.policy.select_action(self.q_values)    # 行動選択
        self.last_action_id = action_id
        action = self.action_list[action_id]
        return action

    def get_reward(self, reward):
        self.reward_history.append(reward)
        self.q_values[self.last_action_id] = self._update_q_value(reward)   # 期待報酬値の更新

    def _update_q_value(self, reward):
        return ((1.0 - self.alpha) * self.q_values[self.last_action_id]) + (self.alpha * reward) # 通常の指数移動平均で更新

行動選択方策プログラム(policy.py)

import copy
import numpy as np
import math
import ipdb
from abc import ABCMeta, abstractmethod

class Policy(metaclass=ABCMeta):

    @abstractmethod
    def select_action(self, **kwargs):
        pass

class UCB(Policy):
    def __init__(self, c, actions):
        super(UCB, self).__init__()
        self.average_rewards = np.repeat(0.0, len(actions)) # 各腕の平均報酬
        self.ucbs = np.repeat(10.0, len(actions))   # UCB値
        self.counters = np.repeat(0, len(actions))  # 各腕の試行回数
        self.c = c
        self.all_conter = 0 # 全試行回数
        self.name = "UCB"

    def select_action(self):
        action_id = np.argmax(self.ucbs)
        self.counters[action_id] += 1
        self.all_conter += 1
        return action_id

    def update_usbs(self, action_id, reward):
        self.ucbs[action_id] = self.average_rewards[action_id] + self.c * math.log(self.all_conter)/np.sqrt(self.counters[action_id])

    def update_average_rewards(self, action_id, reward):
        self.average_rewards[action_id] = (self.counters[action_id]*self.average_rewards[action_id]+reward)/(self.counters[action_id]+1)

参考