見出し画像

ECS & EventBridge & DynamoDBでAurora MySQLの月次データ削除バッチを自動化したよ

こんにちは、すずきです。

Aurora MySQLの不要データを削除するにあたり、データ数が多すぎて手動での削除は効率が悪いので、自動化を検討しました。今回は月単位のデータを日毎にバッチ処理で自動削除する仕組みを構築しました。

当初はEventBridge + Lambdaでバッチ処理を行うつもりだったのですが、よく考えたらLambdaの最大実行時間(15分)以内に処理が完了しないことに気がついたので、Lambdaの使用を諦めました。

そこで、ECS on FargateのタスクをEventBridgeのスケジューラーで毎日定期的に起動する構成に変更しました。

また、どのデータを削除したかの進捗管理のために、無料枠でいけそうなDynamoDBを使ってみることにしました。


構築手順

1. DynamoDBテーブルの作成

削除進捗を管理するためにDynamoDBを使用します。以下の設定でテーブルを作成します(他はデフォルトのまま)。

  • テーブル名: deletion_progress

  • パーティションキー: id (文字列)

2. DynamoDB操作用のIAMポリシー作成

ECSタスクからDynamoDBへのデータ取得・保存・更新が必要なため、DynamoDB操作用のIAMポリシーを作成し、ECSタスク実行ロール(ecsTaskExecutionRole)にアタッチします。

以下のポリシーを作成し、ecsTaskExecutionRoleにアタッチします。Resourceには先ほど作成したDynamoDBテーブルのARNを指定します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:UpdateItem"
            ],
            "Resource": "arn:aws:dynamodb:your-region:your-account-id:table/your-table-name"
        }
    ]
}

3. ECS用セキュリティグループの作成

ECSタスクからAuroraとDynamoDBにアクセスできるように、セキュリティグループを設定します。Auroraにはポート3306、DynamoDBにはポート443を許可します。

以下は参考のTerraformコードです。

resource "aws_security_group_rule" "ecs_sg_egress_rds" {
  type                     = "egress"
  from_port                = 3306
  to_port                  = 3306
  protocol                 = "tcp"
  security_group_id        = aws_security_group.ecs_sg.id
  source_security_group_id = aws_security_group.rds_sg.id
}

resource "aws_security_group_rule" "ecs_sg_egress_https" {
  type              = "egress"
  from_port         = 443
  to_port           = 443
  protocol          = "tcp"
  security_group_id = aws_security_group.ecs_sg.id
  cidr_blocks       = ["0.0.0.0/0"]
}

4. ECRリポジトリの作成

ECSタスク用のDockerイメージを保管するため、ECRリポジトリdaily-deletionを作成します。

5. アプリケーションコードの作成(Python)

Aurora MySQLのoperationsテーブルから、2022年4月~2024年5月のレコードを月単位で削除するスクリプトです。DynamoDBで削除進行状況(last_year, last_month)を保存し、次回の処理に利用します。

import os
import decimal
import pymysql
import boto3
from datetime import datetime, timedelta
from logging import INFO, Formatter, StreamHandler, getLogger

RDS_HOST = os.getenv('RDS_HOST')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_NAME = os.getenv('DB_NAME')
DYNAMODB_TABLE = os.getenv('DYNAMODB_TABLE')


dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)

logger = getLogger()
logger.setLevel(INFO)

if not logger.hasHandlers():
    handler = StreamHandler()
    logger.addHandler(handler)

    formatter = Formatter(
        '[%(levelname)s]\t%(asctime)s\t%(message)s\n',
        '%Y-%m-%d %H:%M:%S'
    )

    for handler in logger.handlers:
        handler.setFormatter(formatter)


def connect_to_db():  # MySQLに接続する関数
    try:
        logger.info("Attempting to connect to the database with retries")
        connection = pymysql.connect(host=RDS_HOST,
                                     port=3306,
                                     user=DB_USER,
                                     password=DB_PASSWORD,
                                     db=DB_NAME,
                                     cursorclass=pymysql.cursors.DictCursor)
        logger.info("Database connection established successfully")
        return connection
    except pymysql.MySQLError as e:
        logger.error(f"Database connection failed: {e}")
        raise


def get_deletion_progress():  # 進行状況をDynamoDBから取得
    response = table.get_item(Key={'id': 'progress'})
    if 'Item' in response:
        last_year = int(response['Item']['last_year'])
        last_month = int(response['Item']['last_month'])

        logger.info(
            f"Retrieved from DynamoDB - last_year: {last_year} (type: {type(last_year)}), last_month: {last_month} (type: {type(last_month)})")

        return last_year, last_month
    else:
        # 初回の場合は2022年4月を設定(進行状況がない場合はまだ何も削除されていない)
        return 2022, 3  # 2022年4月を最初に削除するため、初回は3月として扱う


def update_deletion_progress(year, month):  # 進行状況をDynamoDBに保存
    table.put_item(Item={
        'id': 'progress',
        'last_year': year,
        'last_month': month
    })


def delete_records(year, month):  # 月単位でレコードを削除する関数
    try:
        logger.info(
            f"Starting record deletion for year: {year}, month: {month}")

        connection = connect_to_db()
        with connection.cursor() as cursor:
            start_date = f'{year}-{month:02d}-01'
            logger.info(f"Start Date generated: {start_date}")
            end_date = (datetime(year, month, 1) + timedelta(days=32)
                        ).replace(day=1).strftime('%Y-%m-%d')
            logger.info(
                f"Deleting records for the period {start_date} to {end_date}")

            delete_query = f"""
                DELETE {DB_NAME}.operations
                FROM {DB_NAME}.operations
                WHERE operations.created_at >= %s
                AND operations.created_at < %s;
            """

            cursor.execute(
                delete_query, (start_date, end_date))

            connection.commit()
            logger.info(f"Deleted records for {year}-{month:02d}")
    except Exception as e:
        logger.error(f"Error deleting records: {e}")
    finally:
        if connection:
            connection.close()


def main():
    last_year, last_month = get_deletion_progress()
    logger.info(
        f"get_deletion_progress: {last_year}, month: {last_month}")

    if last_month == 12:
        next_year = last_year + 1
        next_month = 1
    else:
        next_year = last_year
        next_month = last_month + 1

    if next_year > 2024 or (next_year == 2024 and next_month > 5):
        logger.info(
            "Deletion process has completed. No more records to delete.")
        return

    try:
        delete_records(next_year, next_month)

        update_deletion_progress(next_year, next_month)
        logger.info(
            f"Deletion progress updated to {next_year}-{next_month:02d}")

    except Exception as e:
        logger.error(f"Error during deletion process: {e}")
        import traceback
        logger.error(traceback.format_exc())


if __name__ == "__main__":
    main()

セキュリティのため、MySQL接続情報は環境変数から読み込みます。

connection = pymysql.connect(host=RDS_HOST,
                             port=3306,
                             user=DB_USER,
                             password=DB_PASSWORD,
                             db=DB_NAME,

また、DynamoDBから取得したlast_yearとlast_monthがDecimal型になっているので、Pythonで数値計算できるようにIntに変換します。

last_year = int(response['Item']['last_year'])
last_month = int(response['Item']['last_month'])

6. Dockerイメージのビルドおよびプッシュ

ECSタスクで参照するDockerイメージをローカル環境でビルドし、先ほど作成したECRリポジトリにプッシュします。

まず、main.pyと同階層にDockerfileとrequiments.txtを作成します。

FROM python:3.12-slim

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "./main.py"]
boto3
pymysql

ローカル環境でビルドします。buildxをつかっているのは、自環境がMac M2(Arm)のためです。

docker buildx build --platform linux/amd64 -f Dockerfile -t <account_id>.dkr.ecr.ap-northeast-1.amazonaws.com/daily-deletion:latest .

ECR認証します。

aws ecr get-login-password --region ap-northeast-1 | docker login --username AWS --password-stdin <accound_id>.dkr.ecr.ap-northeast-1.amazonaws.com

ビルドしたイメージをプッシュします。

docker push <accound_id>.dkr.ecr.ap-northeast-1.amazonaws.com/daily-deletion:latest

プッシュしたらECRリポジトリを確認します。

7. ECSタスク定義の作成

JSONでタスク定義を作成します。
アプリケーションで使用する環境変数はenvironmentに定義します。
また、データ量によってはmemoryやcpuをもっと小さい値にできます(検証してみないとわからないですが)。
あと、"awslogs-create-group": "true”がないと初回のタスク実行でCloudWatchロググループの作成に失敗してエラーが出るので、注意が必要です。

{
  "family": "daily-deletion-task",
  "networkMode": "awsvpc",
  "containerDefinitions": [
    {
      "name": "daily-deletion-container",
      "image": "<account_id>.dkr.ecr.ap-northeast-1.amazonaws.com/daily-deletion:latest",
      "essential": true,
      "memory": 512,
      "cpu": 256,
      "environment": [
        { "name": "RDS_HOST", "value": "<cluster_endpoint_name>" },
        { "name": "DB_USER", "value": "<mysql_user_name>" },
        { "name": "DB_PASSWORD", "value": "<mysql_user_password>" },
        { "name": "DB_NAME", "value": "<db_schema_name>" },
        { "name": "DYNAMODB_TABLE", "value": "deletion_progress" }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/ecs/daily-deletion",
          "awslogs-create-group": "true”,
          "awslogs-region": "ap-northeast-1",
          "awslogs-stream-prefix": "ecs"
        }
      }
    }
  ],
  "requiresCompatibilities": ["FARGATE"],
  "cpu": "256",
  "memory": "512",
  "taskRoleArn": "arn:aws:iam::<account_name>:role/ecsTaskRole",
  "executionRoleArn": "arn:aws:iam::<account_name>:role/ecsTaskExecutionRole"
}

8. EventBridgeスケジュールの作成

これまでECSタスクを立ち上げる前にECSサービスを構築していたので、今回もそうするものだと思いこんでいたのですが、バッチ処理をする際はEventBridgeから直接ECSタスクを立ち上げるということを初めて知りました。

「EventBridge > スケジュール」からスケジュールパターン(今回は毎日AM3:00)、タスク定義、サブネット、セキュリティグループなどを選択します。

動作確認

CloudWatch Logsでレコードが正しく削除されたことを確認しました。

採用情報


いいなと思ったら応援しよう!