見出し画像

BedrockでSecurity Hubの検知内容を要約して、Slack通知とZenhubのIssue作成を自動化しました

こんにちは、すずきです。

ConfigやGuardDutyでセキュリティ通知をメールで飛ばすような仕組みを運用していたのですが、通知内容をちゃんと読まないと何が問題なのかわからず、調査に時間がかかるという課題がありました。

そんな中、技術書展で買った書籍の「Security Hubを最大限活用するためのポイント」という章で、LambdaとBedrockで検知内容を日本語で要約して、対応方法まで含めた通知を送信する仕組みが紹介されており、それを参考に実装してみることにしました。


実装

実装では主に以下の記事を参考にしつつ、独自の工夫を何点か加えました。

Security Hubの有効化

コードを書く前に、Security Hubを有効化します。
Security HubはConfig、GuardDuty、Trusted Advisorの検知内容をまとめて管理してくれて、EventBridgeのトリガーにすることもできます。

Slack API

Slack APIの管理画面から新しいAppを作成し、Incoming Webhooksを有効化してWebhook URLを取得します。取得したWebhook URLは、後ほどLambdaを作成する際に環境変数SLACK_WEBHOOK_URLとして登録します。

Slack APIのIncoming Webhooks

ファイル構成

以下がプロジェクトのファイル構成です。

- Dockerfile
- lambda_function.py
- requiments.txt
- README.md

コード全文

以下がLambdaのスクリプトで、Security Hubの検知結果をBedrockを使用して解釈して要約を生成します。その結果をSlack通知およびZenhubへのIssue作成として活用します。

lambda_function.py

import json
import os
import boto3
import requests
from botocore.exceptions import ClientError


SLACK_WEBHOOK_URL = os.environ['SLACK_WEBHOOK_URL']
BEDROCK_MODEL_ID = os.environ['BEDROCK_MODEL_ID']
INCLUDED_SEVERITIES = os.environ.get(
    'INCLUDED_SEVERITIES', 'CRITICAL,HIGH,MEDIUM').split(',')
GITHUB_TOKEN = os.environ['GITHUB_TOKEN']
GITHUB_REPO = os.environ['GITHUB_REPO']
MAX_TOKENS = int(os.environ.get('MAX_TOKENS', 1000))

if not SLACK_WEBHOOK_URL or not BEDROCK_MODEL_ID or not GITHUB_TOKEN or not GITHUB_REPO:
    raise ValueError("必要な環境変数が設定されていません。")

bedrock = boto3.client('bedrock-runtime', region_name='ap-northeast-1')


def lambda_handler(event, context):
    print(f"Received event: {json.dumps(event)}")

    findings = event.get('detail', {}).get('findings', [])
    if not findings:
        print("No findings to process.")
        return {
            'statusCode': 200,
            'body': json.dumps('No findings in the event.')
        }

    results = []

    for finding in findings:
        interpreted_finding, severity = interpret_finding(finding)

        # 環境変数で指定された重要度に含まれる場合のみ結果に追加
        if severity in INCLUDED_SEVERITIES:
            results.append(interpreted_finding)

    if results:
        combined_message = "\n\n".join(results)

        issue_title = generate_title_from_summary(combined_message)

        send_slack_notification(
            issue_title,
            combined_message
        )

        create_zenhub_ticket(
            issue_title,
            combined_message
        )

    return {
        'statusCode': 200,
        'body': json.dumps('Processing complete')
    }


def interpret_finding(finding) -> tuple[str, str]:
    system_message = (
        "あなたはセキュリティ専門家です。Security Hub の検出結果を解釈し、日本語で開発者向けに簡潔な説明と推奨される対応方針を提示してください。"
    )

    human_message = (
        f"以下のSecurity Hub Findingを解釈し、開発者向けに簡潔な説明と推奨される対応方針を提示してください:\n\n"
        f"{json.dumps(finding, indent=2)}\n\n"
        "回答は以下の形式で提供してください:\n"
        "- 検出内容の要約:\n"
        "- 重要度: [CRITICAL/HIGH/MEDIUM/LOW]\n"
        "- 影響:\n"
        "- 推奨される対応:\n"
    )

    try:
        response = bedrock.invoke_model(
            modelId=BEDROCK_MODEL_ID,
            body=json.dumps({
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": MAX_TOKENS,
                "system": system_message,
                "messages": [{"role": "user", "content": human_message}]
            }),
            contentType="application/json",
            accept="application/json"
        )
        response_body = json.loads(response['body'].read())
        print("Full Bedrock Response:", json.dumps(response_body, indent=2))

        # contentフィールドからtextタイプを抽出
        content_items = response_body.get("content", [])
        extracted_texts = [
            item.get("text", "") for item in content_items if item.get("type") == "text"
        ]

        # 抽出されたテキストを結合
        interpreted_text = "\n".join(extracted_texts).strip()

        severity = "LOW"  # デフォルトの重要度
        if "重要度:" in interpreted_text:
            for line in interpreted_text.splitlines():
                if line.startswith("- 重要度:"):
                    severity = line.replace("- 重要度:", "").strip().split()[0]
                    break

        print(
            f"Bedrock interpretation: {interpreted_text}, Severity: {severity}")
        return interpreted_text, severity

    except ClientError as e:
        print(f"Error calling Bedrock: {e}")
        return "Bedrockの呼び出しに失敗しました。原文を確認してください。"


def generate_title_from_summary(summary: str) -> str:
    lines = summary.splitlines()
    for idx, line in enumerate(lines):
        if line.startswith("- 検出内容の要約:"):
            # 要約の次の行を取得し、最初の句点までを抽出
            if idx + 1 < len(lines):
                first_sentence = lines[idx + 1].split("。")[0]
                return first_sentence.strip() if first_sentence else "🔐 Security Hubの検知結果"
    return "🔐 Security Hubの検知結果"


def send_slack_notification(title: str, detail: str) -> None:
    payload = {
        'attachments': [
            {
                'color': '#36a64f',
                'pretext': title,
                'text': detail
            }
        ]
    }
    try:
        response = requests.post(SLACK_WEBHOOK_URL, data=json.dumps(payload))
    except requests.exceptions.RequestException as e:
        print(e)
    else:
        print(response.status_code)


def create_zenhub_ticket(title: str, description: str) -> None:
    payload = {
        "title": title,
        "body": description
    }
    headers = {
        "Authorization": f"token {GITHUB_TOKEN}",
        "Content-Type": "application/json"
    }

    try:
        url = f"https://api.github.com/repos/{GITHUB_REPO}/issues"
        response = requests.post(url, json=payload, headers=headers)
        response.raise_for_status()
        print(
            f"GitHub Issue created successfully: {response.json().get('html_url')}")
    except requests.exceptions.RequestException as e:
        print(f"Error creating GitHub Issue: {e}")

requirements.txt

boto3
requests

環境変数による設定とコスト調整
Bedrockのモデル(BEDROCK_MODEL_ID)、出力トークンの最大サイズ(MAX_TOKENS)、検知内容の重大度(INCLUDED_SEVERITIES)を環境変数から設定することで、柔軟にコストを調整できるようにしました。

最初は以下の設定にしていたのですが、10 $/dayほどかかってしまってしまいました。

  • BEDROCK_MODEL_ID: anthropic.claude-3-5-sonnet-20240620-v1:0

  • INCLUDED_SEVERITIES: CRITICAL,HIGH,MEDIUM

  • MAX_TOKENS: 1000

以下の設定に変えたら、1.5 $/dayほどに下がりました。

  • BEDROCK_MODEL_ID: anthropic.claude-3-haiku-20240307-v1:0

  • INCLUDED_SEVERITIES: CRITICAL,HIGH

  • MAX_TOKENS: 500

Zenhubへのチケット起票
create_zenhub_ticketで、Security Hubの検知結果をもとにGitHub Issueを作成します。

Issue作成のためにrepoにチェックをつけて、Personal Access Tokenを発行します。Lambda作成時に、GITHUB_TOKENとして環境変数を登録します。GITHUB_TOKENに発行したトークン、GITHUB_REPOに<organization>/<repository>形式でリポジトリ名を設定します。

https://github.com/settings/tokens

GitHub Apps

Lambdaのデプロイ

外部ライブラリを使用するので、DockerでLambdaをデプロイします。

以下がDockerfileです。

FROM public.ecr.aws/lambda/python:3.13

WORKDIR /var/task

COPY requirements.txt ./
COPY lambda_function.py ./

RUN pip install -r requirements.txt

CMD ["lambda_function.lambda_handler"]

ECRにリポジトリを作成して、以下の手順でビルド&プッシュします。

docker buildx build --platform linux/amd64 -f Dockerfile -t <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com/securityhub-bedrock-slack:latest .
aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com
docker push <account-id>.dkr.ecr.ap-northeast-1.amazonaws.com/securityhub-bedrock-slack

Lambda作成時にプッシュしたコンテナイメージを選択します。

Lambda作成画面

Lambda作成後、タイムアウト時間を1分に伸ばします(デフォルトの3秒だとタイムアウトするため)。

EventBridge

EventBridgeの新しいルールを作成します。
イベントタイプにはSecurity Hub Findings - Importedを選択します。

イベントパターン

ターゲットには作成したLambdaを登録します。

ターゲット

Slack通知とZenhub Issue

こんな感じのSlack通知が届きます。

通知の中身

ZenhubのIssueも自動で作成されます。

Zenhub

採用情報


いいなと思ったら応援しよう!