見出し画像

AWS CDK で類似学生計算エンジンを定義して、 IaC の良さを感じる③

はじめに

こんにちは!株式会社POLでエンジニアをやっている @mejihabenatawa です!

POLは「研究者の可能性を最大化するプラットフォームを創造する」をビジョンに、理系学生に特化した採用サービス、および研究開発者・技術者に特化した転職/採用サービスの2サービスを運営しています。

前回のテックブログで AWS CDK で類似学生計算エンジンの AWS Glue 部分について紹介したので、今回は CDKにおける AWS Batch 部分の書き方を紹介していきたいと思います。

1. 全体構成編

2. AWS Glue 編

3. AWS Batch 編(今回)

4. AWS Step Functions etc 編

事前に用意するスクリプト

・calculate_similar_student.py(Glue 部分で作成した学生ベクトルをS3から読み込み、faissを用いて近傍探索を行い、類似学生をDynamoDBに書き込む)

こちらの Python スクリプトを事前に用意し、project の batch/src 配下においておきます。

また、AWS Batch はコンテナ上で動作するので、Dockerfileも用意し、batch 配下においておきます。

今回使用する主なファイルの構成は以下になります。AWS Batch 関連のディレクトリ配下が主な編集部分。

-- project
|-- batch                                 -- AWS Batch 関連のディレクトリ
|    |-- src
|    |    |-- write_data_to_dynamodb.py    
|    |
|    |-- Dockerfile
|
|-- cdk                                   -- AWS CDK 関連のディレクトリ
|    |-- bin
|    |    |-- cdk.ts
|    |
|    |-- lib 
|    |    |-- cdk-stack.ts
|
|-- glue   -- AWS Glue 関連のディレクトリ
|    |-- src
|         |-- calculate_vector.py   -- ベクトルを計算する Python スクリプト
|
|-- README.md
|
|-- cdk.json

AWS Batch について

全体構成編でも説明したとおり、AWS Batch では ベクトル同士の近傍探索とDynamoDBへの書き込みを行います。

AWS Batch とは簡単に言うと「コンテナ化されたバッチ処理を何でも実行できる」というものです。

事前に用意するスクリプト

AWS Batch で起動する Docker イメージとその中で動かす Python スクリプトを事前に用意します。

まず、Dockerfile についてです。miniconda のイメージをもとにスクリプトに必要なライブラリをインストールし、最後の一行で Python スクリプトを起動します。

Dockerfile

FROM continuumio/miniconda3

MAINTAINER hajime watanabe

ENV TZ=Asia/Tokyo

# conda create
RUN conda install python==3.8

# faiss をインストール
SHELL ["/bin/bash", "-l", "-c"]

RUN . /opt/conda/bin/activate \
 && conda install faiss-cpu -c pytorch

RUN pip install pyathena
RUN pip install pandas
RUN pip install numpy

RUN pip install awslambdaric

WORKDIR /app/
COPY . /app
WORKDIR /app/src/

ENTRYPOINT [ "/opt/conda/bin/python", "write_data_to_dynamodb.py" ]

続いて、write_data_to_dynamodb.py についてです。ところどころ処理はコメントで済ませていますが、S3 からデータをダウンロードしてきて、それをもとに学生ベクトルの近傍探索を行うという感じです。

import numpy as np
import pandas as pd
import faiss

import io
import os
import boto3
from logging42 import logger

def get_s3file(bucket_name, key):
   s3 = boto3.resource('s3')
   s3obj = s3.Object(bucket_name, key).get()

   return io.TextIOWrapper(io.BytesIO(s3obj['Body'].read()))

def upload_s3file(index, file_name, bucket_name, key):
   if not os.path.exists(file_name):
       f = open(file_name,'w')
       f.write('')
       f.close()

   faiss.write_index(index, file_name)
   return boto3.Session().resource('s3').Bucket(bucket_name).Object(key).upload_file(file_name)

def download_s3file(bucket_name, file_name, key):
   if not os.path.exists(file_name):
       f = open(file_name, 'w')
       f.write('')
       f.close()

   s3 = boto3.resource('s3')
   return s3.Bucket(bucket_name).download_file(Filename=file_name, Key=key)

def write_similar_student_with_keyword():

   env_stage = os.getenv('ENV')
   data_bucket_name = os.getenv('DATA_BUCKET_NAME')
   vector_output_file = os.getenv('VECTOR_OUTPUT_FILE')
   faiss_index_output_file = os.getenv('FAISS_INDEX_OUTPUT_FILE')

   # region calculate faiss index
   # 学生の vector をロード
   logger.info("ここでGlueによって、計算されたの学生ベクトルを読み込み")

   # Indexを計算
   logger.info("ここで学生のfaissインデックスを計算")
   
   # indexデータを保存
   file_name = '/tmp/saved-index.faiss'
   upload_flg = upload_s3file(index, file_name, data_bucket_name, faiss_index_output_file)
   
   # endregion calculate faiss index
   
   # region write similar student to dynamodb
   # Search
   print("ここで類似学生を検索")

   # DynamoDB put item
   try:
       print("ここでDynamoDBに類似学生を書き込み")

   except Exception as error:
       logger.error(error)
       raise error

   # endregion write similar student to dynamodb
       
   return "ok"
   

if __name__ == "__main__":
   write_similar_student_with_keyword()

AWS Console にて、事前に用意するもの

AWS Batch を起動するためのコンピュータリソースを用意する必要があるため以下のものを用意し、それぞれの Id を控えておきます。

・VPC

・Subnet

つづいて、最終的な類似学生を書き込む DynamoDB を用意します。テーブル名は ' sample-dynamodb' とかにし、パーティションキー は StudentId (number) にします。

最後に AWS Batch Job を実行するためのロールを 2つ用意し、これらの Role Arn を控えておきます。

・AWS Batch を実行するロール

・AWS Batch Job を実行するロール

追加する環境変数

環境変数については定義の仕方を AWS Glue 編で説明しましたが、今回追加するものがあるので、更新したものを記載します。

cdk.json

{
 "app": "npx ts-node --prefer-ts-exts bin/devio.ts",
 "context": {
    "dev": {
     "@aws-cdk/core:enableStackNameDuplicates": "true",
     "aws-cdk:enableDiffNoFail": "true",
     "@aws-cdk/core:stackRelativeExports": "true",
     "@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
     "@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true,
     "@aws-cdk/aws-kms:defaultKeyPolicies": true,
     "@aws-cdk/aws-s3:grantWriteWithoutAcl": true,
     "Env": "dev"
     "DataBucketName": "sample.dev",
     "GlueScriptLocation": "application/glue/src",
     "GlueConnectionName": "hogehoge",
     // ここから追記
     "VpcId": "用意した vpc id",
     "SubnetIds": ["用意した subnet id"],
     "ServiceRoleForBatchARN": "用意した AWS Batch 実行ロールの Arn",
     "JobRoleARN": "用意した AWS Batch Job の実行ロールのArn",
     "JobOverrideCommand": [""],
     "JobImageRepositoryName": "作成する AWS Batch のジョブイメージのリポジトリの名前",
     "JobImageTagName": "Docker の Image Tag",
     "JobvCPUS": 1,
     "JobMemoryLimitMiB": 1024
   },
    "stg": {
     "@aws-cdk/core:enableStackNameDuplicates": "true",
     "aws-cdk:enableDiffNoFail": "true",
     "@aws-cdk/core:stackRelativeExports": "true",
     "@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
     "@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true,
     "@aws-cdk/aws-kms:defaultKeyPolicies": true,
     "@aws-cdk/aws-s3:grantWriteWithoutAcl": true,
     "Env": "stg",
     "DataBucketName": "sample.stg",
     "GlueScriptLocation": "application/glue/src",
     "GlueConnectionName": "hogehoge",
     // ここから追記
     "VpcId": "用意した vpc id",
     "SubnetIds": ["用意した subnet id"],
     "ServiceRoleForBatchARN": "用意した AWS Batch 実行ロールの Arn",
     "JobRoleARN": "用意した AWS Batch Job の実行ロールのArn",
     "JobOverrideCommand": [""],
     "JobImageRepositoryName": "作成する AWS Batch のジョブイメージのリポジトリの名前",
     "JobImageTagName": "Docker の Image Tag",
     "JobvCPUS": 1,
     "JobMemoryLimitMiB": 1024
     
   },
    "prd": {
     "@aws-cdk/core:enableStackNameDuplicates": "true",
     "aws-cdk:enableDiffNoFail": "true",
     "@aws-cdk/core:stackRelativeExports": "true",
     "@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
     "@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true,
     "@aws-cdk/aws-kms:defaultKeyPolicies": true,
     "@aws-cdk/aws-s3:grantWriteWithoutAcl": true,
     "Env": "prd",
     "DataBucketName": "sample.prd",
     "GlueScriptLocation": "application/glue/src",
     "GlueConnectionName": "hogehoge",
     // ここから追記
     "VpcId": "用意した vpc id",
     "SubnetIds": ["用意した subnet id"],
     "ServiceRoleForBatchARN": "用意した AWS Batch 実行ロールの Arn",
     "JobRoleARN": "用意した AWS Batch Job の実行ロールのArn",
     "JobOverrideCommand": [""],
     "JobImageRepositoryName": "作成する AWS Batch のジョブイメージのリポジトリの名前",
     "JobImageTagName": "Docker の Image Tag",
     "JobvCPUS": 1,
     "JobMemoryLimitMiB": 1024
     
   },
 }
}

CDK上で作成するもの

最後に今まで用意したものをCDK上でどのように記述するのかを説明します。cdk-stask.ts に AWS Batch 部分の定義を追記します。Batch Name の定義とDockerImageAsset の追加を行います。DockerImageAsset の追加では、ECRにリポジトリを作成することができます。

container-batch.ts に、そのリポジトリ名や コンソールで作成した VPC の id や Subnet の id を渡します。container-batch.ts ではそれらを利用して、AWS Batch の Job を定義します。

cdk-stack.ts

import * as cdk from '@aws-cdk/core';
import * as glue from "@aws-cdk/aws-glue";
import * as s3 from '@aws-cdk/aws-s3'
import * as iam from "@aws-cdk/aws-iam";
import * as s3Deploy from '@aws-cdk/aws-s3-deployment'
import { ContainerBatch } from "./container-batch";

export interface Context {
 ENVStage: string;
 DataBucketName: string,
 GlueScriptLocation: string;
 GlueLibraryLocation: string;
 GlueConnectionName: string;
 VectorOutputFile: string;
 FaissIndexOutputFile: string;
 ContainerImageTag: string;
 VpcId: string;
 SubnetIds: string[];
 ServiceRoleForBatchARN: string;
 JobRoleARN: string;
 JobOverrideCommand: string[];
 JobImageTagName: string;
 JobvCPUS: number;
 JobMemoryLimitMiB: number;
}

export class CdkStack extends cdk.Stack {
 constructor(
   scope: cdk.Construct,
   id: string,
   context: Context,
   props?: cdk.StackProps
 ) {
   super(scope, id, props);

   // #region Glue

   /* 
   
   前回の範囲のため省略

   */

   // #endregion Glue
   
   // #region AWS Batch
   const batchName = `CalculateFaissIndexBatch-${context.ENVStage}`;
   
   // Dockerイメージを作成してECRにプッシュ。このとき、リポジトリ名を指定して、リポジトリを作成する。
   const imageAsset: cdk.DockerImageAssetLocation = this.synthesizer.addDockerImageAsset({
     sourceHash: context.JobImageTagName,
     directoryName: `../../batch`,
     repositoryName: `calculate-faiss-index-batch-${context.ENVStage}`,
   })

   // AWS Batch の宣言するコードは ./container-batch.ts に切り出し。 parameter を渡す。
   const awsBatch = new ContainerBatch(this, batchName, {
     name: batchName,
     vpcId: context.VpcId,
     subnetIds: context.SubnetIds,
     serviceRoleForBatchARN: context.ServiceRoleForBatchARN,
     jobRoleARN: context.JobRoleARN,
     jobOverrideCommand: context.JobOverrideCommand,
     jobEnvironment: {
       TZ: "Asia/Tokyo",
       ENV: `${context.ENVStage}`,
       DATA_BUCKET_NAME: context.DataBucketName,
       VECTOR_OUTPUT_FILE: context.VectorOutputFile,
       FAISS_INDEX_OUTPUT_FILE: context.FaissIndexOutputFile,
     },
     jobImageRepositoryName: imageAsset.repositoryName,
     jobImageTagName: context.JobImageTagName,
     jobvCPUS: context.JobvCPUS,
     jobMemoryLimitMiB: context.JobMemoryLimitMiB,
   });

   // #endregion AWS Batch
   
 }
}

container-batch.ts

import * as cdk from "@aws-cdk/core";
import * as ec2 from "@aws-cdk/aws-ec2";
import * as iam from "@aws-cdk/aws-iam";
import * as ecr from "@aws-cdk/aws-ecr";
import * as ecs from "@aws-cdk/aws-ecs";
import * as batch from "@aws-cdk/aws-batch";

export interface ContainerBatchProps {
 readonly name: string;
 readonly vpcId: string;
 readonly subnetIds: string[];
 readonly serviceRoleForBatchARN: string;
 readonly jobRoleARN: string;
 readonly jobOverrideCommand: string[];
 readonly jobEnvironment: {
   [key: string]: string;
 };
 readonly jobImageRepositoryName: string;
 readonly jobImageTagName: string;
 readonly jobvCPUS: number;
 readonly jobMemoryLimitMiB: number;
}

export class ContainerBatch extends cdk.Construct {

 public jobDefinitionArn: string;
 public jobQueueArn: string;

 constructor(scope: cdk.Construct, id: string, props: ContainerBatchProps) {
   super(scope, id);

   // Console で準備した VPC の を id から VPC インスタンスを作成する
   const vpc = ec2.Vpc.fromLookup(this, "VPC", {
     vpcId: props.vpcId,
   });

   // Console で準備した Subnet の を id を VPC インスタンスに紐付ける
   const selectSubnets = vpc.selectSubnets({
     subnets: props.subnetIds.map((subnetId) =>
       ec2.Subnet.fromSubnetAttributes(this, `${subnetId}-Subnet`, {
         availabilityZone: "dummy",
         subnetId: subnetId,
       })
     ),
   });

   // 作成した VPC 内にセキュリティグループを作成する
   const securityGroup = new ec2.SecurityGroup(this, "SecurityGroup", {
     vpc: vpc,
   });

   // Console で準備した AWS Batch 用の Role の Role Arn から、Role インスタンスを作成する
   const serviceRoleForBatch = iam.Role.fromRoleArn(
     this,
     "ServiceRoleForBatch",
     props.serviceRoleForBatchARN
   );

   // VPC, Subnet, セキュリティグループ, role からコンピュート環境を作成する
   const computeEnvironment = new batch.ComputeEnvironment(
     this,
     "BatchComputeEnvironment",
     {
       computeEnvironmentName: `${props.name}-ComputeEnvironment`,
       computeResources: {
         type: batch.ComputeResourceType.ON_DEMAND,
         vpc: vpc,
         vpcSubnets: selectSubnets,
         securityGroups: [securityGroup],
       },
       serviceRole: serviceRoleForBatch,
     }
   );

   // 用意した Job の Role Arn から Batch Job の Role インスタンスを作成する
   const jobRole = iam.Role.fromRoleArn(
       this,
       "BatchJobRole",
       props.jobRoleARN
     );

   // コンピュート環境から Job Queue を作成する
   const jobQueue = new batch.JobQueue(this, "JobQueue", {
     jobQueueName: `${props.name}-JobQueue`,
     computeEnvironments: [
       {
         computeEnvironment: computeEnvironment,
         order: 1,
       },
     ],
   });

   this.jobQueueArn = jobQueue.jobQueueArn
   
   // cdk-stack.ts の50行目で作成されたリポジトリを取得する
   const repository: ecr.IRepository = ecr.Repository.fromRepositoryName(
       this,
       `ECRRepository`,
       props.jobImageRepositoryName,
   )
   
   // リポジトリから特定のイメージを取得
   const image = ecs.ContainerImage.fromEcrRepository(repository, props.jobImageTagName)

   // ジョブ定義を作成
   const batchJobDefinition = new batch.JobDefinition(this, "JobDefinition", {
     jobDefinitionName: `${props.name}-JobDefinition`,
     container: {
       environment: props.jobEnvironment,
       image: image,
       jobRole: jobRole,
       vcpus: props.jobvCPUS,
       memoryLimitMiB: props.jobMemoryLimitMiB,
     },
   });

   this.jobDefinitionArn = batchJobDefinition.jobDefinitionArn;
 }
}

それぞれの処理に関する説明はコメントに記載しました。省いている部分もあるので、不明点があれば Twitter などに連絡ください!

まとめ

今回は AWS CDK における AWS Batch の定義の仕方を説明しました。サンプルコード をこちらになります。

次回は Step Function で各タスクをつなぐ処理の書き方を説明していきたいと思います。

株式会社POLではエンジニア、エンジニアリングマネージャーを大募集してます!お話しだけでも構いませんのでお気軽にお声がけください!!!

https://lapras.com/job_listings/1702

https://lapras.com/job_listings/1703



いいなと思ったら応援しよう!