見出し画像

Fuzzer

ChatGPT(o1-mini)に作ってもらったものですが、実際に使えるかは分かりません。


Pythonによるグレイボックスファジングツールの完全な実装例を提供いたします。


1. プロジェクト構成

以下のようなディレクトリ構造を想定します:

fuzzer/
├── fuzzer.py
├── target_program/
│   ├── http_server.py
│   ├── ftp_server.py
│   ├── smtp_server.py
│   └── websocket_server.py
├── crashes.json
└── mutation_model.pkl
  • fuzzer.py: ファジングツールのメインスクリプト。

  • target_program/: ファジング対象の各プロトコルサーバーのスクリプト。

  • crashes.json: 発見されたクラッシュ情報を保存するファイル。

  • mutation_model.pkl: 学習済みの変異モデルを保存するファイル。


2. 必要なライブラリのインストール

以下のPythonライブラリが必要です。pipを使用してインストールしてください。

pip install coverage difflib scikit-learn websockets
  • coverage: コードカバレッジの測定。

  • difflib: クラッシュの類似性分析。

  • scikit-learn: 機械学習モデルの実装。

  • websockets: WebSocketプロトコルのサポート。


3. ファジング対象のプロトコルサーバーの実装

3.1 HTTPサーバー (target_program/http_server.py)

# target_program/http_server.py

from http.server import BaseHTTPRequestHandler, HTTPServer

class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/crash":
            raise ValueError("Intentional Crash")
        elif self.path == "/":
            self.send_response(200)
            self.end_headers()
            self.wfile.write(b"Hello, World!")
        else:
            self.send_response(200)
            self.end_headers()
            self.wfile.write(f"Requested path: {self.path}".encode())

def run(server_class=HTTPServer, handler_class=SimpleHTTPRequestHandler):
    server_address = ('localhost', 8080)
    httpd = server_class(server_address, handler_class)
    print(f"HTTP Server running on {server_address}")
    httpd.serve_forever()

if __name__ == "__main__":
    run()

3.2 FTPサーバー (target_program/ftp_server.py)

# target_program/ftp_server.py

import socket

def handle_client(client_socket):
    try:
        client_socket.sendall(b"220 Welcome to the FTP server\r\n")
        while True:
            request = client_socket.recv(1024).decode()
            if not request:
                break
            if "CRASH" in request.upper():
                raise ValueError("Intentional Crash")
            response = "200 Command OK\r\n"
            client_socket.sendall(response.encode())
    except Exception as e:
        client_socket.sendall(f"500 Internal Server Error: {e}\r\n".encode())
    finally:
        client_socket.close()

def run_ftp_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('localhost', 2121))
    server.listen(5)
    print("FTP Server running on localhost:2121")
    while True:
        client_sock, addr = server.accept()
        handle_client(client_sock)

if __name__ == "__main__":
    run_ftp_server()

3.3 SMTPサーバー (target_program/smtp_server.py)

# target_program/smtp_server.py

import socket

def handle_client(client_socket):
    try:
        client_socket.sendall(b"220 Welcome to the SMTP server\r\n")
        while True:
            request = client_socket.recv(1024).decode()
            if not request:
                break
            if "CRASH" in request.upper():
                raise ValueError("Intentional Crash")
            response = "250 OK\r\n"
            client_socket.sendall(response.encode())
    except Exception as e:
        client_socket.sendall(f"500 Internal Server Error: {e}\r\n".encode())
    finally:
        client_socket.close()

def run_smtp_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('localhost', 2525))
    server.listen(5)
    print("SMTP Server running on localhost:2525")
    while True:
        client_sock, addr = server.accept()
        handle_client(client_sock)

if __name__ == "__main__":
    run_smtp_server()

3.4 WebSocketサーバー (target_program/websocket_server.py)

# target_program/websocket_server.py

import asyncio
import websockets

async def echo(websocket, path):
    try:
        async for message in websocket:
            if "CRASH" in message.upper():
                raise ValueError("Intentional Crash")
            await websocket.send(f"Echo: {message}")
    except Exception as e:
        await websocket.send(f"500 Internal Server Error: {e}")

def run_websocket_server():
    start_server = websockets.serve(echo, "localhost", 8765)
    print("WebSocket Server running on localhost:8765")
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

if __name__ == "__main__":
    run_websocket_server()

4. ファジングツールの実装 (fuzzer.py)

以下に、拡張アイデアを取り入れたファジングツールの完全な実装例を示します。

import random
import string
import subprocess
import socket
import json
import os
import difflib
from collections import defaultdict
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.linear_model import LogisticRegression
import pickle

# ========== ファジングツール設定 ==========
SEED_INPUTS = {
    "HTTP": "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n",
    "FTP": "USER anonymous\r\nPASS guest\r\n",
    "SMTP": "HELO localhost\r\nMAIL FROM:<test@example.com>\r\nRCPT TO:<recipient@example.com>\r\nDATA\r\nTest Email\r\n.\r\n",
    "WebSocket": "Hello WebSocket"
}

DICTIONARY = [
    "GET", "POST", "PUT", "DELETE", "/admin", "/login", "/register", "/user/",
    "USER", "PASS", "QUIT", "RETR", "STOR", "LIST", "PWD", "CWD", "MKD", "DELE",
    "HELO", "MAIL FROM:", "RCPT TO:", "DATA",
    "Hello", "WebSocket", "CRASH"
]

MAX_ITERATIONS = 1000
MUTATIONS_PER_INPUT = 3
CRASHES_FILE = "crashes.json"
MODEL_FILE = "mutation_model.pkl"

# ========== クラッシュの管理 ==========
class CrashManager:
    def __init__(self, crashes_file):
        self.crashes_file = crashes_file
        self.crashes = []
        self.signatures = set()
        if os.path.exists(self.crashes_file):
            with open(self.crashes_file, 'r') as f:
                data = json.load(f)
                self.crashes = data.get("crashes", [])
                self.signatures = set(crash["signature"] for crash in self.crashes)

    def add_crash(self, input_data, output):
        signature = self.generate_signature(output)
        if signature not in self.signatures:
            crash = {
                "input": input_data,
                "output": output,
                "signature": signature
            }
            self.crashes.append(crash)
            self.signatures.add(signature)
            self.save_crashes()

    def generate_signature(self, output):
        # クラッシュ出力の要約を生成(ここでは出力の類似度スコアを使用)
        return difflib.SequenceMatcher(None, output, "").ratio()

    def save_crashes(self):
        with open(self.crashes_file, 'w') as f:
            json.dump({"crashes": self.crashes}, f, indent=4)

    def report(self):
        if not self.crashes:
            print("No crashes detected.")
        else:
            print("\n=== Crashes Detected ===")
            for crash in self.crashes:
                print(f"Input: {repr(crash['input'])}, Output: {crash['output']}, Signature: {crash['signature']}")

# ========== カバレッジトラッカー ==========
class CoverageTracker:
    def __init__(self):
        self.covered_lines = set()

    def update_coverage(self, new_coverage):
        updated = False
        for line in new_coverage:
            if line not in self.covered_lines:
                updated = True
                self.covered_lines.add(line)
        return updated

# ========== 変異エンジン ==========
class Mutator:
    def __init__(self, dictionary, model=None):
        self.dictionary = dictionary
        self.model = model  # 機械学習モデル

    def mutate(self, input_data):
        if self.model:
            # 学習モデルに基づく変異
            return self.model_based_mutation(input_data)
        else:
            # ランダム変異またはディクショナリ変異
            mutation_type = random.choice(["random", "dictionary"])
            if mutation_type == "random":
                return self.random_mutation(input_data)
            else:
                return self.dictionary_mutation(input_data)

    def random_mutation(self, input_data):
        mutated = list(input_data)
        if not mutated:
            return self.random_string(10)
        pos = random.randint(0, len(mutated) - 1)
        mutated[pos] = random.choice(string.ascii_letters + string.digits + string.punctuation)
        return ''.join(mutated)

    def dictionary_mutation(self, input_data):
        words = input_data.split()
        if not words:
            return input_data
        pos = random.randint(0, len(words) - 1)
        words[pos] = random.choice(self.dictionary)
        return ' '.join(words)

    def model_based_mutation(self, input_data):
        # シンプルな例として、モデルが変異の確率を決定
        # 実際にはもっと高度な変異戦略を実装可能
        mutation_type = self.model.predict([input_data])[0]
        if mutation_type == 0:
            return self.random_mutation(input_data)
        else:
            return self.dictionary_mutation(input_data)

    def random_string(self, length):
        return ''.join(random.choices(string.ascii_letters + string.digits + string.punctuation, k=length))

# ========== 学習モデル ==========
class MutationModel:
    def __init__(self):
        self.vectorizer = CountVectorizer()
        self.model = LogisticRegression()

    def train(self, inputs, labels):
        X = self.vectorizer.fit_transform(inputs)
        self.model.fit(X, labels)

    def predict(self, inputs):
        X = self.vectorizer.transform(inputs)
        return self.model.predict(X)

    def save_model(self, filename):
        with open(filename, 'wb') as f:
            pickle.dump({
                "vectorizer": self.vectorizer,
                "model": self.model
            }, f)

    def load_model(self, filename):
        with open(filename, 'rb') as f:
            data = pickle.load(f)
            self.vectorizer = data["vectorizer"]
            self.model = data["model"]

# ========== プロトコル別実行エンジン ==========
class Executor:
    def __init__(self):
        self.protocols = {
            "HTTP": self.execute_http,
            "FTP": self.execute_ftp,
            "SMTP": self.execute_smtp,
            "WebSocket": self.execute_websocket
        }

    def execute(self, protocol, input_data):
        if protocol in self.protocols:
            return self.protocols[protocol](input_data)
        else:
            return "", f"Unsupported protocol: {protocol}"

    def execute_http(self, input_data):
        try:
            process = subprocess.Popen(
                ["curl", "-X", "POST", "http://localhost:8080"],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            stdout, stderr = process.communicate(input=input_data, timeout=5)
            return stdout.strip(), stderr.strip()
        except subprocess.TimeoutExpired:
            process.kill()
            return "Timeout", "Timeout Expired"

    def execute_ftp(self, input_data):
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.connect(('localhost', 2121))
                sock.sendall(input_data.encode())
                response = sock.recv(4096).decode()
                return response, ""
        except Exception as e:
            return "", str(e)

    def execute_smtp(self, input_data):
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.connect(('localhost', 2525))
                sock.sendall(input_data.encode())
                response = sock.recv(4096).decode()
                return response, ""
        except Exception as e:
            return "", str(e)

    def execute_websocket(self, input_data):
        try:
            # WebSocketは非同期プロトコルなので、簡易的に実装
            # 実際には専用ライブラリを使用すべき
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
                sock.connect(('localhost', 8765))
                sock.sendall(input_data.encode())
                response = sock.recv(4096).decode()
                return response, ""
        except Exception as e:
            return "", str(e)

# ========== 学習アルゴリズム ==========
class LearningAlgorithm:
    def __init__(self, model_file):
        self.model_file = model_file
        self.model = MutationModel()
        if os.path.exists(self.model_file):
            self.model.load_model(self.model_file)
        else:
            # 初期モデル(ランダム)
            self.model.train([], [])

    def update_model(self, inputs, labels):
        self.model.train(inputs, labels)
        self.model.save_model(self.model_file)

# ========== メインファザー ==========
class Fuzzer:
    def __init__(self):
        self.executor = Executor()
        self.crash_manager = CrashManager(CRASHES_FILE)
        self.coverage_tracker = CoverageTracker()
        self.learning_algorithm = LearningAlgorithm(MODEL_FILE)
        self.mutator = Mutator(DICTIONARY, model=self.learning_algorithm.model)
        self.input_queue = list(SEED_INPUTS.values())
        self.seen_inputs = set()
        self.iteration = 0

    def run(self):
        while self.iteration < MAX_ITERATIONS and self.input_queue:
            input_data = self.input_queue.pop(0)
            if input_data in self.seen_inputs:
                continue
            self.seen_inputs.add(input_data)

            # プロトコルの判定(シンプルな方法)
            protocol = self.detect_protocol(input_data)
            if not protocol:
                continue

            # ターゲットの実行
            output, error = self.executor.execute(protocol, input_data)

            print(f"Iteration: {self.iteration}, Protocol: {protocol}, Input: {repr(input_data)}, Output: {output[:50]}, Error: {error[:50]}")

            # カバレッジの更新(仮想的に行として扱う)
            new_coverage = set(output.split("\n"))
            coverage_increased = self.coverage_tracker.update_coverage(new_coverage)
            if coverage_increased:
                print("Coverage increased!")

            # クラッシュの検出
            if "Exception" in output or "Error" in error:
                self.crash_manager.add_crash(input_data, output + error)

            # フィードバックの収集と学習
            # シンプルな例として、成功ならラベル0、エラーならラベル1
            label = 1 if ("Exception" in output or "Error" in error) else 0
            self.learning_algorithm.update_model([input_data], [label])

            # 変異の生成
            for _ in range(MUTATIONS_PER_INPUT):
                mutated_input = self.mutator.mutate(input_data)
                if mutated_input not in self.seen_inputs:
                    self.input_queue.append(mutated_input)

            self.iteration += 1

        self.crash_manager.report()

    def detect_protocol(self, input_data):
        # シンプルなプロトコル判定
        if input_data.startswith("GET") or input_data.startswith("POST") or input_data.startswith("PUT") or input_data.startswith("DELETE"):
            return "HTTP"
        elif input_data.startswith("USER") or input_data.startswith("PASS") or input_data.startswith("QUIT") or input_data.startswith("RETR") or input_data.startswith("STOR"):
            return "FTP"
        elif input_data.startswith("HELO") or input_data.startswith("MAIL FROM:") or input_data.startswith("RCPT TO:") or input_data.startswith("DATA"):
            return "SMTP"
        else:
            # WebSocketはテキストベースだが、明確な開始がないためデフォルトでWebSocket
            return "WebSocket"

# ========== 実行 ==========
if __name__ == "__main__":
    fuzzer = Fuzzer()
    fuzzer.run()

5. 実装の詳細説明

5.1 ファジング対象プロトコルの追加

ファザープロセスでは、HTTP、FTP、SMTP、WebSocketの各プロトコルをサポートしています。Executorクラスが各プロトコルに対応した実行方法を提供します。

  • HTTP: curlコマンドを使用してHTTPリクエストを送信。

  • FTP: ソケットを使用してFTPサーバーに接続し、コマンドを送信。

  • SMTP: ソケットを使用してSMTPサーバーに接続し、メールコマンドを送信。

  • WebSocket: ソケットを使用してWebSocketサーバーに接続し、メッセージを送信。

5.2 クラッシュの持続的保存

CrashManagerクラスがクラッシュ情報を管理し、発見されたクラッシュをcrashes.jsonファイルに保存します。クラッシュのシグネチャを生成し、同一のクラッシュタイプを重複して保存しないようにしています。

5.3 学習アルゴリズムの導入

LearningAlgorithmクラスが機械学習モデルを管理します。MutationModelクラスでは、CountVectorizerとLogisticRegressionを使用して、入力データから変異のタイプ(ランダム変異かディクショナリ変異か)を予測します。

  • トレーニング: 各入力に対して、成功(ラベル0)かエラー(ラベル1)かのラベルを付与し、モデルをトレーニングします。

  • 変異の選択: モデルが入力データを基に、どちらの変異タイプを選択すべきかを予測し、適切な変異を行います。

  • モデルの保存と読み込み: 学習済みのモデルはmutation_model.pklに保存され、次回の実行時に読み込まれます。

5.4 フィードバック駆動型変異の高度化

カバレッジの増加だけでなく、エラーやクラッシュの発生を考慮して、変異の戦略を改善します。学習モデルを使用して、エラーを引き起こす可能性の高い変異を優先的に選択します。

5.5 入力のクラスタリングの高度化

クラッシュのシグネチャを生成し、類似性に基づいてクラスタリングします。同一タイプのクラッシュは重複して保存されません。

5.6 プロトコルの深い理解

各プロトコルの特性を理解し、適切な入力生成と変異を行います。例えば、SMTPではメールコマンドに基づいた入力を生成し、WebSocketではメッセージ形式に基づいた入力を生成します。

5.7 ディクショナリの拡張

各プロトコル特有のキーワードや、既知の脆弱性に関連するパターンをディクショナリに追加し、より意味のある変異を行います。


6. 実行方法

6.1 ファジング対象サーバーの起動

それぞれのプロトコルサーバーを別々のターミナルで起動します。

# HTTPサーバーの起動
python target_program/http_server.py

# FTPサーバーの起動
python target_program/ftp_server.py

# SMTPサーバーの起動
python target_program/smtp_server.py

# WebSocketサーバーの起動
python target_program/websocket_server.py

6.2 ファザースクリプトの実行

別のターミナルでファザーを実行します。

python fuzzer.py

ファザーは、初期シード入力から始まり、変異を繰り返しながら各プロトコルサーバーに対してリクエストを送信します。クラッシュが検出された場合、crashes.jsonに保存され、最終的にコンソールに報告されます。


7. 注意点

  1. カバレッジデータの取得:

    • この実装では、カバレッジデータを単純化しています。実際のカバレッジ計測には、より詳細な解析やインストルメンテーションが必要です。

  2. WebSocketの実装:

    • WebSocketは非同期プロトコルであり、実際の通信には専用のライブラリ(例: websockets)を使用することが望ましいです。ここでは簡易的な実装を行っています。

  3. 学習モデルの改善:

    • 現在の学習モデルはシンプルなロジスティック回帰を使用しています。より高度なモデルや特徴量エンジニアリングを行うことで、変異戦略の精度を向上させることが可能です。

  4. セキュリティ:

    • ファジング対象のサーバーはテスト環境で実行し、本番環境や実際のシステムには影響を与えないように注意してください。

  5. エラーハンドリング:

    • 各プロトコルの実行時に発生する可能性のあるエラーを適切にハンドリングし、ファザーの安定性を確保してください。


8. さらなる拡張アイデア

  • 他プロトコルの追加:

    • SMTPやWebSocket以外にも、FTP以外のプロトコル(例: SSH、Telnetなど)をサポートすることで、ファザーの汎用性をさらに高めます。

  • クラッシュの詳細な分析:

    • クラッシュのスタックトレースや詳細なログを保存し、後から詳細に分析できるようにします。

  • 高度な変異戦略:

    • 特定のパターンやシーケンスを学習し、意味のある変異を生成するための高度なアルゴリズムを導入します。

  • GUIの導入:

    • ファザーの進行状況や検出されたクラッシュを可視化するためのGUIを実装します。

  • 分散ファジング:

    • 複数のマシンやプロセスに分散してファジングを実行し、効率を向上させます。

いいなと思ったら応援しよう!