今回はKerasのチュートリアルに異常検知をオートエンコーダーを使って行う参考プログラムがあったので、それを元に異常検知をしてみます。(ちょくちょくソースコードを変更しています)
ちなみに私は異常検知初心者ですので、間違いがあったら教えていただければと思います
notebookはgithubにあげてあります。
とりあえず、必要なモジュール等をimport しておきます。
import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
from matplotlib import pyplot as plt
データ
今回使うデータですが、
Kaggleに上がっている、異常検知の人工的に作成したデータを使っていきます。
https://www.kaggle.com/boltzmannbrain/nab
正常なデータと異常なデータを取得します。
まずは正常なデータを取得します。
master_url_root = "https://raw.githubusercontent.com/numenta/NAB/master/data/"
df_small_noise_url_suffix = "artificialNoAnomaly/art_daily_small_noise.csv"
df_small_noise_url = master_url_root + df_small_noise_url_suffix
df_small_noise = pd.read_csv(
df_small_noise_url, parse_dates=True, index_col="timestamp"
)
df_small_noise.head()
取得したデータは、以下のように日付がインデックスとなっていて、valueという列を持ったデータになります。
value
timestamp
2014-04-01 00:00:00 18.324919
2014-04-01 00:05:00 21.970327
2014-04-01 00:10:00 18.624806
2014-04-01 00:15:00 21.953684
2014-04-01 00:20:00 21.909120
これをプロットしてみると、
df_small_noise.plot()
正常なデータですので、しっかりと正規性の持ったデータが取得できていることがわかります。
次に、異常が含まれるデータを取得します。
df_daily_jumpsup_url_suffix = "artificialWithAnomaly/art_daily_jumpsup.csv"
df_daily_jumpsup_url = master_url_root + df_daily_jumpsup_url_suffix
df_daily_jumpsup = pd.read_csv(
df_daily_jumpsup_url, parse_dates=True, index_col="timestamp"
)
このデータもプロットしてみます。
df_daily_jumpsup.plot()
画像を見れば、わかると思いますが、一点だけ、異常に飛び出た部分があります。これが異常値になります。
今回はこの異常の場所を見つけることを目標とします。
学習用データの作成
オートエンコーダーに学習させていくのですが、
まずは、データの標準化を行っていきます。
training_mean = df_small_noise.mean()
training_std = df_small_noise.std()
df_training_value = (df_small_noise - training_mean) / training_std
df_training_value.plot()
データが平均0、分散1ぐらいになっていると思います。
次に、データを以下のような形で時間をずらしながらデータを作成していきます。
以下のgifのように少しずつずらしながら学習用データを作成していきます。
TIME_STEPS = 288
# Generated training sequences for use in the model.
def create_sequences(values, time_steps=TIME_STEPS):
output = []
for i in range(len(values) - time_steps):
output.append(values[i : (i + time_steps)])
return np.stack(output)
x_train = create_sequences(df_training_value.values)
print("Training input shape: ", x_train.shape)
#=> Training input shape: (3744, 288, 1)
shapeが(データ数、timestep数、要素)数になりました。
モデルの作成と学習
モデルは1次元の畳込みニューラルネットワークを用いたオートエンコーダーです。
model = keras.Sequential(
[
layers.Input(shape=(x_train.shape[1], x_train.shape[2])),
layers.Conv1D(
filters=32, kernel_size=7, padding="same", strides=2, activation="relu"
),
layers.Dropout(rate=0.2),
layers.Conv1D(
filters=16, kernel_size=7, padding="same", strides=2, activation="relu"
),
layers.Conv1DTranspose(
filters=16, kernel_size=7, padding="same", strides=2, activation="relu"
),
layers.Dropout(rate=0.2),
layers.Conv1DTranspose(
filters=32, kernel_size=7, padding="same", strides=2, activation="relu"
),
layers.Conv1DTranspose(filters=1, kernel_size=7, padding="same"),
]
)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss="mse")
model.summary()
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
conv1d (Conv1D) (None, 144, 32) 256
_________________________________________________________________
dropout (Dropout) (None, 144, 32) 0
_________________________________________________________________
conv1d_1 (Conv1D) (None, 72, 16) 3600
_________________________________________________________________
conv1d_transpose (Conv1DTran (None, 144, 16) 1808
_________________________________________________________________
dropout_1 (Dropout) (None, 144, 16) 0
_________________________________________________________________
conv1d_transpose_1 (Conv1DTr (None, 288, 32) 3616
_________________________________________________________________
conv1d_transpose_2 (Conv1DTr (None, 288, 1) 225
=================================================================
Total params: 9,505
Trainable params: 9,505
Non-trainable params: 0
conv1d_transposeに関しては以下のサイトが参考になります。
https://github.com/vdumoulin/conv_arithmetic
オートエンコーダーですので、入力と出力が同じになるように学習させます。
なので、以下のようにfit関数を実装します。
history = model.fit(
x_train,
x_train,
epochs=50,
batch_size=128,
validation_split=0.1,
callbacks=[
keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min")
],
)
実際にどのような感じかX_train[0]を入力して確認しています。学習したモデルにx_trainを入力したx_train_predと比較します。
x_train_pred = model.predict(x_train)
plt.plot(x_train[0])
plt.plot(x_train_pred[0])
だいたい同じようなデータが取得できています。
青が学習用データ、オレンジが予測値になります。だいたい同じような値が出力されていることがわかります。
うまく学習できているようです。
異常値のしきい値の決定
学習したオートエンコーダーを使って異常値のしきい値を決定していきます。
学習データに対する出力の誤差を取得します。ここでは、絶対誤差を使っています。
train_mae_loss = np.mean(np.abs(x_train_pred - x_train), axis=1)
この誤差の分布をヒストグラムで見てみます。
# 誤差の分布を表示する
plt.hist(train_mae_loss, bins=50)
plt.xlabel("Train MAE loss")
plt.ylabel("No of samples")
plt.show()
ここでは、異常値のしきい値として、ヒストグラムの一番右の値(訓練データと学習したモデルから出力された値との最大誤差)を用います。
threshold = np.max(train_mae_loss)
print("Reconstruction error threshold: ", threshold)
#=> Reconstruction error threshold: 0.1248834796715283
テストデータでの評価
では、テストデータで異常なところを見つけていきます。
まず、テストデータについてもまずは標準化します。
df_test_value = (df_daily_jumpsup - training_mean) / training_std
fig, ax = plt.subplots()
df_test_value.plot(legend=False, ax=ax)
テストデータについてもオートエンコーダーに入力できるような形で成形します。
x_test = create_sequences(df_test_value.values)
print("Test input shape: ", x_test.shape)
テストデータに関しても同様に絶対誤差を計算します。
x_test_pred = model.predict(x_test)
test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
test_mae_loss = test_mae_loss.reshape((-1))
plt.hist(test_mae_loss, bins=50)
plt.xlabel("test MAE loss")
plt.ylabel("No of samples")
plt.show()
先程決めたしきい値を越えたidx(データ)を抽出します。
anomalies = test_mae_loss > threshold
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))
anomaliesには異常値であればTrueが格納されています。whereについてはリファレンス参考。
異常値を表示するために、異常値のidxを探していきます。
anomalous_data_indices = []
for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
# 指定した範囲すべてが異常なら異常値に含む
if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
anomalous_data_indices.append(data_idx)
プロットしてみます。
df_subset = df_daily_jumpsup.iloc[anomalous_data_indices]
fig, ax = plt.subplots()
df_daily_jumpsup.plot(legend=False, ax=ax)
df_subset.plot(legend=False, ax=ax, color="r")
plt.show()