【入門】kerasで時系列データの異常検知の入門をやってみる

今回はKerasのチュートリアルに異常検知をオートエンコーダーを使って行う参考プログラムがあったので、それを元に異常検知をしてみます。(ちょくちょくソースコードを変更しています)

ちなみに私は異常検知初心者ですので、間違いがあったら教えていただければと思います

notebookはgithubにあげてあります。

とりあえず、必要なモジュール等をimport しておきます。

import numpy as np
import pandas as pd
from tensorflow import keras
from tensorflow.keras import layers
from matplotlib import pyplot as plt

データ

今回使うデータですが、
Kaggleに上がっている、異常検知の人工的に作成したデータを使っていきます。

https://www.kaggle.com/boltzmannbrain/nab

正常なデータと異常なデータを取得します。

まずは正常なデータを取得します。

master_url_root = "https://raw.githubusercontent.com/numenta/NAB/master/data/"

df_small_noise_url_suffix = "artificialNoAnomaly/art_daily_small_noise.csv"
df_small_noise_url = master_url_root + df_small_noise_url_suffix
df_small_noise = pd.read_csv(
    df_small_noise_url, parse_dates=True, index_col="timestamp"
)
df_small_noise.head()

取得したデータは、以下のように日付がインデックスとなっていて、valueという列を持ったデータになります。

                        value
timestamp                     
2014-04-01 00:00:00  18.324919
2014-04-01 00:05:00  21.970327
2014-04-01 00:10:00  18.624806
2014-04-01 00:15:00  21.953684
2014-04-01 00:20:00  21.909120

これをプロットしてみると、

df_small_noise.plot()

正常なデータですので、しっかりと正規性の持ったデータが取得できていることがわかります。

次に、異常が含まれるデータを取得します。

df_daily_jumpsup_url_suffix = "artificialWithAnomaly/art_daily_jumpsup.csv"
df_daily_jumpsup_url = master_url_root + df_daily_jumpsup_url_suffix
df_daily_jumpsup = pd.read_csv(
    df_daily_jumpsup_url, parse_dates=True, index_col="timestamp"
)

このデータもプロットしてみます。

df_daily_jumpsup.plot()

画像を見れば、わかると思いますが、一点だけ、異常に飛び出た部分があります。これが異常値になります。

今回はこの異常の場所を見つけることを目標とします。

学習用データの作成

オートエンコーダーに学習させていくのですが、

まずは、データの標準化を行っていきます。

training_mean = df_small_noise.mean()
training_std = df_small_noise.std()
df_training_value = (df_small_noise - training_mean) / training_std
df_training_value.plot()

データが平均0、分散1ぐらいになっていると思います。

次に、データを以下のような形で時間をずらしながらデータを作成していきます。
以下のgifのように少しずつずらしながら学習用データを作成していきます。

TIME_STEPS = 288

# Generated training sequences for use in the model.
def create_sequences(values, time_steps=TIME_STEPS):
    output = []
    for i in range(len(values) - time_steps):
        output.append(values[i : (i + time_steps)])
    return np.stack(output)


x_train = create_sequences(df_training_value.values)
print("Training input shape: ", x_train.shape)
#=> Training input shape:  (3744, 288, 1)

shapeが(データ数、timestep数、要素)数になりました。

モデルの作成と学習

モデルは1次元の畳込みニューラルネットワークを用いたオートエンコーダーです。

model = keras.Sequential(
    [
        layers.Input(shape=(x_train.shape[1], x_train.shape[2])),
        layers.Conv1D(
            filters=32, kernel_size=7, padding="same", strides=2, activation="relu"
        ),
        layers.Dropout(rate=0.2),
        layers.Conv1D(
            filters=16, kernel_size=7, padding="same", strides=2, activation="relu"
        ),
        layers.Conv1DTranspose(
            filters=16, kernel_size=7, padding="same", strides=2, activation="relu"
        ),
        layers.Dropout(rate=0.2),
        layers.Conv1DTranspose(
            filters=32, kernel_size=7, padding="same", strides=2, activation="relu"
        ),
        layers.Conv1DTranspose(filters=1, kernel_size=7, padding="same"),
    ]
)
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.001), loss="mse")
model.summary()
Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
conv1d (Conv1D)              (None, 144, 32)           256       
_________________________________________________________________
dropout (Dropout)            (None, 144, 32)           0         
_________________________________________________________________
conv1d_1 (Conv1D)            (None, 72, 16)            3600      
_________________________________________________________________
conv1d_transpose (Conv1DTran (None, 144, 16)           1808      
_________________________________________________________________
dropout_1 (Dropout)          (None, 144, 16)           0         
_________________________________________________________________
conv1d_transpose_1 (Conv1DTr (None, 288, 32)           3616      
_________________________________________________________________
conv1d_transpose_2 (Conv1DTr (None, 288, 1)            225       
=================================================================
Total params: 9,505
Trainable params: 9,505
Non-trainable params: 0

conv1d_transposeに関しては以下のサイトが参考になります。

https://github.com/vdumoulin/conv_arithmetic

オートエンコーダーですので、入力と出力が同じになるように学習させます。

なので、以下のようにfit関数を実装します。

history = model.fit(
    x_train,
    x_train,
    epochs=50,
    batch_size=128,
    validation_split=0.1,
    callbacks=[
        keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min")
    ],
)

実際にどのような感じかX_train[0]を入力して確認しています。学習したモデルにx_trainを入力したx_train_predと比較します。

x_train_pred = model.predict(x_train)
plt.plot(x_train[0])
plt.plot(x_train_pred[0])

だいたい同じようなデータが取得できています。

青が学習用データ、オレンジが予測値になります。だいたい同じような値が出力されていることがわかります。
うまく学習できているようです。

異常値のしきい値の決定

学習したオートエンコーダーを使って異常値のしきい値を決定していきます。

学習データに対する出力の誤差を取得します。ここでは、絶対誤差を使っています。

train_mae_loss = np.mean(np.abs(x_train_pred - x_train), axis=1)

この誤差の分布をヒストグラムで見てみます。

# 誤差の分布を表示する
plt.hist(train_mae_loss, bins=50)
plt.xlabel("Train MAE loss")
plt.ylabel("No of samples")
plt.show()

ここでは、異常値のしきい値として、ヒストグラムの一番右の値(訓練データと学習したモデルから出力された値との最大誤差)を用います。

threshold = np.max(train_mae_loss)
print("Reconstruction error threshold: ", threshold)
#=> Reconstruction error threshold:  0.1248834796715283

テストデータでの評価

では、テストデータで異常なところを見つけていきます。

まず、テストデータについてもまずは標準化します。

df_test_value = (df_daily_jumpsup - training_mean) / training_std
fig, ax = plt.subplots()
df_test_value.plot(legend=False, ax=ax)

テストデータについてもオートエンコーダーに入力できるような形で成形します。

x_test = create_sequences(df_test_value.values)
print("Test input shape: ", x_test.shape)

テストデータに関しても同様に絶対誤差を計算します。

x_test_pred = model.predict(x_test)
test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis=1)
test_mae_loss = test_mae_loss.reshape((-1))
plt.hist(test_mae_loss, bins=50)
plt.xlabel("test MAE loss")
plt.ylabel("No of samples")
plt.show()

先程決めたしきい値を越えたidx(データ)を抽出します。

anomalies = test_mae_loss > threshold
print("Number of anomaly samples: ", np.sum(anomalies))
print("Indices of anomaly samples: ", np.where(anomalies))

anomaliesには異常値であればTrueが格納されています。whereについてはリファレンス参考。

異常値を表示するために、異常値のidxを探していきます。

anomalous_data_indices = []
for data_idx in range(TIME_STEPS - 1, len(df_test_value) - TIME_STEPS + 1):
    # 指定した範囲すべてが異常なら異常値に含む
    if np.all(anomalies[data_idx - TIME_STEPS + 1 : data_idx]):
        anomalous_data_indices.append(data_idx)

プロットしてみます。

df_subset = df_daily_jumpsup.iloc[anomalous_data_indices]
fig, ax = plt.subplots()
df_daily_jumpsup.plot(legend=False, ax=ax)
df_subset.plot(legend=False, ax=ax, color="r")
plt.show()

参考文献

タイトルとURLをコピーしました