今回は協調型のマルチエージェント強化学習アルゴリズムであるDistributed Q Learning を紹介します。
背景
協調型のマルチエージェント強化学習では、いかに各エージェントが協調して、 全エージェントの獲得報酬を最大化することが重要になります。
同じ目的のために、複数の機械が同時に強化学習を行う問題であれば、必ず必要になります。
ちなみに協調型のタスクですので全エージェントの報酬は同値になります。
\begin{aligned}
R_1 = \cdots = R_m = R
\end{aligned}
$$
\(R_i\) はエージェント\(i\)の報酬関数になります。
マルチエージェント強化学習では、各エージェントが単純にQ Learningなどを行う(ILsという)だけでは 学習が収束しないことがしられています。
そこで、今回の紹介するDistributed Q Learningが提案されました。
Distributed Q Learning
Distributed Q Learning は協調型のマルチエージェント強化学習アルゴリズムの1つです。
名前の通り、分散型のQ Learningになります。
この手法の基本的なコンセプトとしては、全エージェントが最大のQ値の更新値\(r+max Q(s’)\)を観測したらQ値を更新していくといった、楽観的に学習していく手法です。
つまり、一度高い報酬を得れる行動を見つけたらその行動を信じて、以後も同じ行動を行うといったアルゴリズムになります。
アルゴリズムは以下のようになります。

上記のアルゴリズムの「optimistic update」のところで、先程述べたような更新値\(r+max Q(s’)\)を観測したらQ値を更新していく部分になります。
次の「equilibria selection」のところで、今の方策\(\pi\)が最大のQ値の行動選択確率が最大でなければ、その行動選択確率を1とします。(つまり、その行動を必ず行うように)
この手法は決定論的なMarkov Game(例:かならず上記の利得表通りに報酬を得られる環境 )で最適方策を学習できることが証明されています。
(詳細は参考文献1を参考にしていただければと思います)
しかしながら、この手法は確率的に報酬が変化するようなな環境には適切に学習できません。
先程述べたように、一度高い報酬を得られた行動を取り続けてしまいます。なので、たまたま高い報酬得られてしまった場合にも完全に信じてしまう問題があるためです。
具体例
以下のような利得表を用いてどのように動作するか見てみます。
| 1,2 | A | B |
|---|---|---|
| A | 10 | -2 |
| B | -2 | 0 |
まず両エージェントがBを選択した場合、お互いの獲得報酬は0です。なので、\(Q_1(B),Q_2(B)\)は0となります。
次にエージェント1がA、エージェント2がBを選択した場合、お互いの獲得報酬は-2ですので、エージェント1の\(Q_1(A)=-2\)、エージェント2の\(Q_2(B)\)はこれまでの最大値を更新しなかったため、0のままです。
そして、探索などで、お互いにAを選択した場合には、獲得報酬は10になり、これまでの最大値を超えます。 お互いのQ値は\(Q_1(A)=10,Q_2(A)=10\)となります。
この利得表をみると報酬が10以上の選択はないので、この後に\(Q\)は変化せず、お互いに最適な選択\(A\)を取り続けることがわかります。
実験
問題設定
今回の実験では、
Climbing gameといったMatrix Gameを用います。
協調型のマルチエージェントゲームですので、すべてのエージェントの報酬は同値となります。
Climbing Gameは以下のような利得表になります。
| 1,2 | A | B | C |
|---|---|---|---|
| A | 11 | -30 | -30 |
| B | -30 | 7 | 6 |
| C | 0 | 0 | 5 |
最適な行動戦略の組み合わせは(A, A)になります。
プログラム
githubにあげました。
https://github.com/tocom242242/distributed_q_learning
プログラムの構成は以下のようになります。
├── distributed_q_learner.py # Distributed Q Learningエージェント ├── matrix_game.py # ゲーム ├── policy.py # 方策(epsilon-greedy) └── run.py
run.pyを実行すると実験を開始します。
distributed_q_learner.py
learnメソッドがdistributed-qのアルゴリズムの部分が記述してあります。 基本的にアルゴリズム通りに実装しています。
import numpy as np
class DistributedQLearner():
def __init__(self,
policy=None,
gamma=0.99,
ini_state="nonstate",
actions=None,
alpha_decay_rate=None,
epsilon_decay_rate=None):
self.gamma = gamma
self.policy = policy
self.reward_history = []
self.actions = actions
self.gamma = gamma
self.alpha_decay_rate = alpha_decay_rate
self.epsilon_decay_rate = epsilon_decay_rate
self.previous_action_id = None
self.q_values = self._init_q_values()
self.state = ini_state
self.pi = {}
self.pi[ini_state] = self._init_pi_values()
self.pi_history = []
self.v = {}
def _init_pi_values(self):
pi = np.repeat(1.0/len(self.actions), len(self.actions))
return pi
def _init_q_values(self):
q_values = {}
return q_values
def act(self, training=True):
if training:
action_id = self.policy.select_action(self.pi[self.state])
self.previous_action_id = action_id
action = self.actions[action_id]
self.previous_action = action
else:
action_id = self.policy.select_greedy_action(self.pi)
action = self.actions[action_id]
return action
def observe(self, state="nonstate", reward=None, opponent_action=None, is_learn=True):
if is_learn:
self.reward_history.append(reward)
self.check_new_state(state)
self.learn(state, reward)
def learn(self, state, reward):
max_q = max([self.q_values[(state, action1)] for action1 in self.actions])
q = reward + self.gamma*max_q
if q > self.q_values[(state, self.previous_action)]:
self.q_values[(state, self.previous_action)] = q
action_argmax_pi = 0
max_pi_value = 0
for action1 in self.actions:
if max_pi_value < self.pi[state][action1]:
action_argmax_pi = action1
max_pi_value = self.pi[state][action1]
max_q = max([self.q_values[(state, action1)] for action1 in self.actions])
if self.q_values[(state, action_argmax_pi)] != max_q:
actions_argmax_qs = [action1 for action1 in self.actions if self.q_values[(state, action1)] == max_q]
actions_argmax_q = np.random.choice(actions_argmax_qs)
for a, _ in enumerate(self.pi[state]):
if actions_argmax_q == a:
self.pi[state][a] = 1
else:
self.pi[state][a] = 0
def check_new_state(self, state):
for action1 in self.actions:
if state not in self.pi.keys():
self.pi[state] = np.array([np.random.random() for _ in self.actions])
if (state, action1) not in self.q_values.keys():
self.q_values[(self.state, action1)] = -10000
matrix_game.py
Climbing gameが実装されています。
import numpy as np
class MatrixGame():
def __init__(self):
self.reward_matrix = self._create_reward_table()
def step(self, action1, action2):
r1, r2 = self.reward_matrix[action1][action2]
return None, r1, r2
def _create_reward_table(self):
reward_matrix = [
[[11, 11], [-30, -30], [0, 0]],
[[-30, -30], [7, 7], [6, 6]],
[[0, 0], [0, 0], [5, 5]],
]
return reward_matrix
policy.py
Epsilon-greedy行動選択が実装されています。
import numpy as np
from abc import ABCMeta, abstractmethod
class Policy(metaclass=ABCMeta):
@abstractmethod
def select_action(self, **kwargs):
pass
class EpsGreedyQPolicy(Policy):
"""
ε-greedy選択
"""
def __init__(self, epsilon=.05, decay_rate=1):
super(EpsGreedyQPolicy, self).__init__()
self.epsilon = epsilon
self.decay_rate = decay_rate
self.name = "epsilon-greedy"
def select_action(self, q_values):
assert q_values.ndim == 1
nb_actions = q_values.shape[0]
if np.random.uniform() < self.epsilon: # random行動
action = np.random.random_integers(0, nb_actions-1)
else: # greedy 行動
action = np.argmax(q_values)
return action
def select_greedy_action(self, q_values):
assert q_values.ndim == 1
nb_actions = q_values.shape[0]
action = np.argmax(q_values)
return action
run.py
実験実行用プログラムです。
最後に平均報酬をプロットします。
import numpy as np
import matplotlib.pyplot as plt
from distributed_q_learner import DistributedQLearner
from policy import EpsGreedyQPolicy
from matrix_game import MatrixGame
import pandas as pd
if __name__ == '__main__':
nb_episode = 40000
actions = np.arange(3)
agent1 = DistributedQLearner(policy=EpsGreedyQPolicy(epsilon=.03), actions=actions)
agent2 = DistributedQLearner(policy=EpsGreedyQPolicy(epsilon=.03), actions=actions)
game = MatrixGame()
for episode in range(nb_episode):
action1 = agent1.act()
action2 = agent2.act()
_, r1, r2 = game.step(action1, action2)
agent1.observe(reward=r1)
agent2.observe(reward=r2)
print("Policy")
print(agent1.pi)
print(agent2.pi)
# moving average reward
average_rewrad_history1 = pd.Series(agent1.reward_history).rolling(50).mean().tolist()
average_rewrad_history2 = pd.Series(agent2.reward_history).rolling(50).mean().tolist()
plt.plot(np.arange(len(average_rewrad_history1)),average_rewrad_history1, label="agent1")
plt.plot(np.arange(len(average_rewrad_history2)),average_rewrad_history2, label="agent2")
plt.xlabel("Episode")
plt.ylabel("Reward")
plt.legend()
plt.savefig("result.jpg")
plt.show()
実験結果

獲得報酬の移動平均をプロットしました。
利得表からわかるように正しく学習できていれば、 11周辺にプロットされるのですが、 学習が進むにつれて11周辺にプロットされていることがわかります。
参考文献
- http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.2.772
- [PDF] Independent reinforcement learners in cooperative Markov games: a survey regarding coordination problems - Semantic Scholar


コメント