見出し画像

アクセスログ監視システムを自作する


概要

このスクリプトは、Nginx のアクセスログを監視し、高頻度のアクセスパターンを検出するためのツールです。短期的および長期的な高頻度アクセスを IP アドレスごとに追跡し、設定された閾値を超えた場合に通知を行います。

私の場合は、Web API の運用を行っているので主にその監視に使っています。

Nginx 用に作成されていますが、適切な修正を加えることで Apache などの他のウェブサーバーのアクセス監視にも使用できます。主に変更が必要な部分は、ログファイルのパスと IP アドレスを抽出する正規表現です。

機能

  • Nginx アクセスログの継続的な監視

  • 短期的および長期的な高頻度アクセスの検出

  • 検出された高頻度アクセスの CSV ファイルへの記録

  • 通知機能(カスタマイズ可能)

  • 詳細なロギング機能

ファイル構成

  • スクリプト: /usr/local/bin/nginx_access_monitor.py

  • 設定ファイル: /etc/nginx/nginx_access_monitor.conf

  • 出力 CSV ファイル: /var/log/nginx/nginx_access_monitor.csv

  • スクリプトログファイル: /var/log/nginx/nginx_access_monitor.log

ファイルパスは環境に合わせて適宜変更してください。

設定

設定ファイル(/etc/nginx/nginx_access_monitor.conf)の内容:

[Settings]
# Nginx アクセスログファイル
log_file = /var/log/nginx/access.log
# 10秒間に20回以上のアクセスを検出
short_term_threshold = 20 
short_term_window = 10
# 1時間に60回以上のアクセスを検出
long_term_threshold = 60
long_term_window = 3600
# 3秒おきにチェック
check_interval = 3

スクリプト全文

import re
import time
from collections import defaultdict, deque
import configparser
import sys
import csv
from datetime import datetime
import os
import logging
from logging.handlers import RotatingFileHandler

# 設定ファイルと出力 CSV ファイルのパス
CONFIG_FILE = '/etc/nginx/nginx_access_monitor.conf'
CSV_FILE = '/var/log/nginx/nginx_access_monitor.csv'
LOG_FILE = '/var/log/nginx/nginx_access_monitor.log'

# ロガーの設定
def setup_logger():
    logger = logging.getLogger('NginxAccessMonitor')
    logger.setLevel(logging.INFO)
    
    # RotatingFileHandlerを使用してログローテーションを実装
    file_handler = RotatingFileHandler(LOG_FILE, maxBytes=10*1024*1024, backupCount=5)
    file_handler.setLevel(logging.INFO)
    
    # コンソールにも出力
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # フォーマッターの設定
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

logger = setup_logger()

def load_config():
    """
    設定ファイルから設定を読み込みます。

    戻り値:
        dict: 設定パラメータを含む辞書。
    """
    config = configparser.ConfigParser()
    config.read(CONFIG_FILE)
    logger.info(f"設定ファイル {CONFIG_FILE} を読み込みました")
    return {
        'log_file': config.get('Settings', 'log_file', fallback='/var/log/nginx/access.log'),
        'short_term_threshold': config.getint('Settings', 'short_term_threshold', fallback=20),
        'short_term_window': config.getint('Settings', 'short_term_window', fallback=10),
        'long_term_threshold': config.getint('Settings', 'long_term_threshold', fallback=60),
        'long_term_window': config.getint('Settings', 'long_term_window', fallback=3600),
        'check_interval': config.getint('Settings', 'check_interval', fallback=3)
    }

def parse_log_line(line):
    """
    ログの行を解析し、IP アドレスを抽出します。

    引数:
        line (str): 解析するログの行。

    戻り値:
        str or None: 抽出された IP アドレス。見つからない場合は None。
    """
    ip_pattern = r'^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
    match = re.match(ip_pattern, line)
    return match.group(1) if match else None

def update_csv(ip, detection_time, frequency, window):
    """
    CSVファイルを更新し、同一IPアドレスの古い行を削除して最新の情報を追加します。

    引数:
        ip (str): 検出された IP アドレス。
        detection_time (str): 検出時刻。
        frequency (int): 検出されたアクセス頻度。
        window (str): 監視時間枠。
    """
    temp_file = CSV_FILE + '.tmp'
    found = False

    # CSVファイルが存在しない場合は新規作成
    if not os.path.exists(CSV_FILE):
        with open(CSV_FILE, 'w', newline='') as csvfile:
            csv_writer = csv.writer(csvfile)
            csv_writer.writerow(['IP', 'Detection Time', 'Frequency', 'Window'])
        logger.info(f"新しいCSVファイル {CSV_FILE} を作成しました")

    with open(CSV_FILE, 'r', newline='') as csvfile, open(temp_file, 'w', newline='') as tempfile:
        csv_reader = csv.reader(csvfile)
        csv_writer = csv.writer(tempfile)

        # ヘッダー行をコピー
        header = next(csv_reader)
        csv_writer.writerow(header)

        for row in csv_reader:
            if row[0] == ip:
                if not found:
                    csv_writer.writerow([ip, detection_time, frequency, window])
                    found = True
            else:
                csv_writer.writerow(row)

        if not found:
            csv_writer.writerow([ip, detection_time, frequency, window])

    os.replace(temp_file, CSV_FILE)
    logger.info(f"CSVファイルを更新しました: IP {ip}, 検出時刻 {detection_time}, 頻度 {frequency}, ウィンドウ {window}")

def write_to_csv(ip, detection_time, frequency, window):
    """
    検出された高頻度アクセスを CSV ファイルに記録します。

    引数:
        ip (str): 検出された IP アドレス。
        detection_time (str): 検出時刻。
        frequency (int): 検出されたアクセス頻度。
        window (str): 監視時間枠。
    """
    update_csv(ip, detection_time, frequency, window)

def cleanup_ip_counts(ip_counts, current_time, long_term_window):
    """
    IP カウントデータを清掃し、古いエントリを削除します。

    引数:
        ip_counts (dict): IP アドレスとそのアクセスカウントを含む辞書。
        current_time (float): 現在の時刻(UNIX タイムスタンプ)。
        long_term_window (int): 長期監視の時間枠(秒)。
    """
    initial_count = len(ip_counts)
    for ip in list(ip_counts.keys()):
        ip_counts[ip]['short'] = deque([t for t in ip_counts[ip]['short'] if current_time - t <= long_term_window], maxlen=ip_counts[ip]['short'].maxlen)
        ip_counts[ip]['long'] = deque([t for t in ip_counts[ip]['long'] if current_time - t <= long_term_window], maxlen=ip_counts[ip]['long'].maxlen)
        if not ip_counts[ip]['short'] and not ip_counts[ip]['long']:
            del ip_counts[ip]
    removed_count = initial_count - len(ip_counts)
    logger.info(f"IP カウントデータのクリーンアップを実行しました。{removed_count} 個のエントリを削除しました。")

def check_frequency(ip, counts, current_time, threshold, window, window_type, last_notifications, last_cleanup):
    """
    指定された IP アドレスのアクセス頻度をチェックし、必要に応じて報告します。
    同一内容の通知は一定時間内にスキップします。

    引数:
        ip (str): チェックする IP アドレス。
        counts (deque): アクセスタイムスタンプのデック。
        current_time (float): 現在の時刻(UNIX タイムスタンプ)。
        threshold (int): 高頻度とみなすアクセス回数の閾値。
        window (int): 監視時間枠(秒)。
        window_type (str): 監視タイプ("短期" または "長期")。
        last_notifications (dict): 前回の通知情報を格納する辞書。
        last_cleanup (float): 前回のクリーンアップ時刻(UNIX タイムスタンプ)。

    戻り値:
        tuple: (
            deque: 更新されたアクセスタイムスタンプのデック,
            dict: 更新された通知情報の辞書,
            float: 更新されたクリーンアップ時刻
        )
    """
    recent_counts = deque(t for t in counts if current_time - t <= window)
    if len(recent_counts) >= threshold:
        detection_time = datetime.fromtimestamp(current_time).strftime("%Y-%m-%d %H:%M:%S")
        message = f"高頻度アクセスを検出: IP {ip} ({window_type} カウント: 直近 {window} 秒間に {len(recent_counts)} 回)"
        
        # 通知条件
        notification_key = f"{ip}_{window_type}"
        should_notify = (
            notification_key not in last_notifications or
            current_time - last_notifications[notification_key]['time'] > 3600 or
            last_notifications[notification_key]['count'] * 2 < len(recent_counts)  # 100%増加で通知
        )

        if should_notify:
            logger.warning(message)
            print(message)
            write_to_csv(ip, detection_time, len(recent_counts), f"{window}s")

            # 定期的なクリーンアップ
            if current_time - last_cleanup > 3600:
                last_notifications = {k: v for k, v in last_notifications.items() if current_time - v['time'] <= 86400}  # 24時間以内のエントリのみ保持
                last_cleanup = current_time
            
            # 最新のアクセス回数と時刻を更新
            last_notifications[notification_key] = {
                'time': current_time,
                'count': len(recent_counts)
            }
        else:
            logger.info(f"通知スキップ(重複): {message}")
    
    return recent_counts, last_notifications, last_cleanup

def get_file_size(filename):
    """ファイルサイズを取得する"""
    try:
        return os.path.getsize(filename)
    except OSError as e:
        logger.error(f"ファイルサイズの取得に失敗しました: {filename}. エラー: {e}")
        return 0

def monitor_access_log(config):
    """
    Nginx のアクセスログを継続的に監視し、高頻度アクセスを検出します。

    引数:
        config (dict): 監視設定を含む辞書。
    """
    log_file = config['log_file']
    short_term_threshold = config['short_term_threshold']
    short_term_window = config['short_term_window']
    long_term_threshold = config['long_term_threshold']
    long_term_window = config['long_term_window']
    check_interval = config['check_interval']

    logger.info(f"Nginx アクセスログの監視を開始します: {log_file}")
    logger.info(f"短期閾値: {short_term_threshold}, 短期ウィンドウ: {short_term_window}秒")
    logger.info(f"長期閾値: {long_term_threshold}, 長期ウィンドウ: {long_term_window}秒")
    logger.info(f"チェック間隔: {check_interval}秒")

    # dequeの初期最大長を適切に設定
    ip_counts = defaultdict(lambda: {
        'short': deque(maxlen=100), 
        'long': deque(maxlen=1000)
    })

    last_notifications = {}
    last_cleanup = time.time()
    last_size = get_file_size(log_file)
    access_count = 0

    while True:
        try:
            current_size = get_file_size(log_file)
            
            if current_size < last_size:
                logger.warning(f"ログファイルのサイズが減少しました。ログローテーションが発生した可能性があります: {log_file}")
                with open(log_file, 'r') as f:
                    f.seek(0, 2)  # ファイルの末尾に移動
                last_size = current_size
            elif current_size > last_size:
                with open(log_file, 'r') as f:
                    f.seek(last_size, 0)  # 前回の位置から読み始める
                    for line in f:
                        current_time = time.time()
                        ip = parse_log_line(line)
                        if ip:
                            ip_counts[ip]['short'].append(current_time)
                            ip_counts[ip]['long'].append(current_time)
                            access_count += 1

                            if access_count % 10 == 0:  # 10アクセスごとに頻度チェック
                                for ip, counts in ip_counts.items():
                                    counts['short'], last_notifications, last_cleanup = check_frequency(ip, counts['short'], current_time, short_term_threshold, short_term_window, "短期", last_notifications, last_cleanup)
                                    counts['long'], last_notifications, last_cleanup = check_frequency(ip, counts['long'], current_time, long_term_threshold, long_term_window, "長期", last_notifications, last_cleanup)

                        if current_time - last_cleanup > 3600:  # 1時間ごとにクリーンアップ
                            cleanup_ip_counts(ip_counts, current_time, long_term_window)
                            last_cleanup = current_time

                last_size = current_size
                logger.info(f"{access_count} のアクセスを処理しました")

            time.sleep(check_interval)

        except Exception as e:
            error_message = f"エラーが発生しました: {e}"
            logger.error(error_message, exc_info=True)
            print(error_message)  # エラー通知を標準出力に出力
            time.sleep(check_interval)  # エラー発生時も一定時間待機

if __name__ == "__main__":
    logger.info("Nginx アクセスログ監視スクリプトを開始します")
    config = load_config()
    try:
        monitor_access_log(config)
    except KeyboardInterrupt:
        logger.info("スクリプトが手動で停止されました")
    except Exception as e:
        logger.critical(f"予期せぬエラーが発生しました: {e}", exc_info=True)
    finally:
        logger.info("Nginx アクセスログ監視スクリプトを終了します")
  • コード修正履歴

    • 通知設定の改善(2024-10-14)

    • dequeのサイズ設定を改善。(2024-10-13)

主要な機能

1. ログファイルの監視

スクリプトは設定された間隔(デフォルトでは3秒ごと)で Nginx のアクセスログを読み取り、新しいエントリーを処理します。

2. アクセス頻度の追跡

各 IP アドレスからのアクセスを短期的(デフォルトでは10秒間)および長期的(デフォルトでは1時間)な時間枠で追跡します。

3. 高頻度アクセスの検出

設定された閾値(短期的には10秒間に20回、長期的には1時間に60回)を超えるアクセスを検出すると、警告を生成します。

4. CSV ファイルへの記録

検出された高頻度アクセスは、CSV ファイルに記録されます。同じ IP アドレスの古い記録は更新されます。

5. 通知機能

高頻度アクセスが検出された場合、通知が生成されます。デフォルトでは print 関数を使用してコンソールに出力しますが、ユーザーの要件に応じてカスタマイズすることができます。

注: 実際の使用時には、メール送信、Slack 通知、システムログへの書き込みなど、環境に適した通知方法を実装することをお勧めします。

6. ログローテーション対応

Nginx のログファイルがローテーションされた場合でも、スクリプトは自動的に新しいログファイルの監視を開始します。

使用方法

  1. 設定ファイル(/etc/nginx/nginx_access_monitor.conf)が適切に設定されていることを確認します。

  2. 必要な Python パッケージ(configparser)がインストールされていることを確認します。

  3. スクリプトを実行します: python /usr/local/bin/nginx_access_monitor.py

継続的な監視のために、このスクリプトを systemd サービスとして設定することを推奨します。

ロギング

  • ログレベル: INFO(通常の動作)、WARNING(高頻度アクセスの検出)、ERROR(エラー状況)

  • ログローテーション: 10MB ごとに自動ローテーション、最大 5 個の古いログファイルを保持

注意事項

  • システムリソースの使用状況を監視し、必要に応じて設定を調整してください。

  • 大量のトラフィックがある環境では、メモリ使用量に注意してください。

トラブルシューティング

エラーが発生した場合、詳細な情報がログファイル(/var/log/nginx/nginx_access_monitor.log)に記録されます。ログファイルを確認して、エラーの原因を特定してください。

セキュリティ

  • スクリプトとログファイルへのアクセス権限を適切に設定してください。

  • 高頻度アクセスの検出閾値を環境に合わせて調整してください。

カスタマイズ

  • 通知方法やログ形式など、スクリプトの様々な部分をカスタマイズして、特定の環境やニーズに合わせることができます。必要に応じてコードを修正し、テストを行ってください。

いいなと思ったら応援しよう!