AWS CDK で類似学生計算エンジンを定義して、 IaC の良さを感じる③
はじめに
こんにちは!株式会社POLでエンジニアをやっている @mejihabenatawa です!
POLは「研究者の可能性を最大化するプラットフォームを創造する」をビジョンに、理系学生に特化した採用サービス、および研究開発者・技術者に特化した転職/採用サービスの2サービスを運営しています。
前回のテックブログで AWS CDK で類似学生計算エンジンの AWS Glue 部分について紹介したので、今回は CDKにおける AWS Batch 部分の書き方を紹介していきたいと思います。
1. 全体構成編
3. AWS Batch 編(今回)
4. AWS Step Functions etc 編
事前に用意するスクリプト
・calculate_similar_student.py(Glue 部分で作成した学生ベクトルをS3から読み込み、faissを用いて近傍探索を行い、類似学生をDynamoDBに書き込む)
こちらの Python スクリプトを事前に用意し、project の batch/src 配下においておきます。
また、AWS Batch はコンテナ上で動作するので、Dockerfileも用意し、batch 配下においておきます。
今回使用する主なファイルの構成は以下になります。AWS Batch 関連のディレクトリ配下が主な編集部分。
-- project
|-- batch -- AWS Batch 関連のディレクトリ
| |-- src
| | |-- write_data_to_dynamodb.py
| |
| |-- Dockerfile
|
|-- cdk -- AWS CDK 関連のディレクトリ
| |-- bin
| | |-- cdk.ts
| |
| |-- lib
| | |-- cdk-stack.ts
|
|-- glue -- AWS Glue 関連のディレクトリ
| |-- src
| |-- calculate_vector.py -- ベクトルを計算する Python スクリプト
|
|-- README.md
|
|-- cdk.json
AWS Batch について
全体構成編でも説明したとおり、AWS Batch では ベクトル同士の近傍探索とDynamoDBへの書き込みを行います。
AWS Batch とは簡単に言うと「コンテナ化されたバッチ処理を何でも実行できる」というものです。
事前に用意するスクリプト
AWS Batch で起動する Docker イメージとその中で動かす Python スクリプトを事前に用意します。
まず、Dockerfile についてです。miniconda のイメージをもとにスクリプトに必要なライブラリをインストールし、最後の一行で Python スクリプトを起動します。
Dockerfile
FROM continuumio/miniconda3
MAINTAINER hajime watanabe
ENV TZ=Asia/Tokyo
# conda create
RUN conda install python==3.8
# faiss をインストール
SHELL ["/bin/bash", "-l", "-c"]
RUN . /opt/conda/bin/activate \
&& conda install faiss-cpu -c pytorch
RUN pip install pyathena
RUN pip install pandas
RUN pip install numpy
RUN pip install awslambdaric
WORKDIR /app/
COPY . /app
WORKDIR /app/src/
ENTRYPOINT [ "/opt/conda/bin/python", "write_data_to_dynamodb.py" ]
続いて、write_data_to_dynamodb.py についてです。ところどころ処理はコメントで済ませていますが、S3 からデータをダウンロードしてきて、それをもとに学生ベクトルの近傍探索を行うという感じです。
import numpy as np
import pandas as pd
import faiss
import io
import os
import boto3
from logging42 import logger
def get_s3file(bucket_name, key):
s3 = boto3.resource('s3')
s3obj = s3.Object(bucket_name, key).get()
return io.TextIOWrapper(io.BytesIO(s3obj['Body'].read()))
def upload_s3file(index, file_name, bucket_name, key):
if not os.path.exists(file_name):
f = open(file_name,'w')
f.write('')
f.close()
faiss.write_index(index, file_name)
return boto3.Session().resource('s3').Bucket(bucket_name).Object(key).upload_file(file_name)
def download_s3file(bucket_name, file_name, key):
if not os.path.exists(file_name):
f = open(file_name, 'w')
f.write('')
f.close()
s3 = boto3.resource('s3')
return s3.Bucket(bucket_name).download_file(Filename=file_name, Key=key)
def write_similar_student_with_keyword():
env_stage = os.getenv('ENV')
data_bucket_name = os.getenv('DATA_BUCKET_NAME')
vector_output_file = os.getenv('VECTOR_OUTPUT_FILE')
faiss_index_output_file = os.getenv('FAISS_INDEX_OUTPUT_FILE')
# region calculate faiss index
# 学生の vector をロード
logger.info("ここでGlueによって、計算されたの学生ベクトルを読み込み")
# Indexを計算
logger.info("ここで学生のfaissインデックスを計算")
# indexデータを保存
file_name = '/tmp/saved-index.faiss'
upload_flg = upload_s3file(index, file_name, data_bucket_name, faiss_index_output_file)
# endregion calculate faiss index
# region write similar student to dynamodb
# Search
print("ここで類似学生を検索")
# DynamoDB put item
try:
print("ここでDynamoDBに類似学生を書き込み")
except Exception as error:
logger.error(error)
raise error
# endregion write similar student to dynamodb
return "ok"
if __name__ == "__main__":
write_similar_student_with_keyword()
AWS Console にて、事前に用意するもの
AWS Batch を起動するためのコンピュータリソースを用意する必要があるため以下のものを用意し、それぞれの Id を控えておきます。
・VPC
・Subnet
つづいて、最終的な類似学生を書き込む DynamoDB を用意します。テーブル名は ' sample-dynamodb' とかにし、パーティションキー は StudentId (number) にします。
最後に AWS Batch Job を実行するためのロールを 2つ用意し、これらの Role Arn を控えておきます。
・AWS Batch を実行するロール
・AWS Batch Job を実行するロール
追加する環境変数
環境変数については定義の仕方を AWS Glue 編で説明しましたが、今回追加するものがあるので、更新したものを記載します。
cdk.json
{
"app": "npx ts-node --prefer-ts-exts bin/devio.ts",
"context": {
"dev": {
"@aws-cdk/core:enableStackNameDuplicates": "true",
"aws-cdk:enableDiffNoFail": "true",
"@aws-cdk/core:stackRelativeExports": "true",
"@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
"@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true,
"@aws-cdk/aws-kms:defaultKeyPolicies": true,
"@aws-cdk/aws-s3:grantWriteWithoutAcl": true,
"Env": "dev"
"DataBucketName": "sample.dev",
"GlueScriptLocation": "application/glue/src",
"GlueConnectionName": "hogehoge",
// ここから追記
"VpcId": "用意した vpc id",
"SubnetIds": ["用意した subnet id"],
"ServiceRoleForBatchARN": "用意した AWS Batch 実行ロールの Arn",
"JobRoleARN": "用意した AWS Batch Job の実行ロールのArn",
"JobOverrideCommand": [""],
"JobImageRepositoryName": "作成する AWS Batch のジョブイメージのリポジトリの名前",
"JobImageTagName": "Docker の Image Tag",
"JobvCPUS": 1,
"JobMemoryLimitMiB": 1024
},
"stg": {
"@aws-cdk/core:enableStackNameDuplicates": "true",
"aws-cdk:enableDiffNoFail": "true",
"@aws-cdk/core:stackRelativeExports": "true",
"@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
"@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true,
"@aws-cdk/aws-kms:defaultKeyPolicies": true,
"@aws-cdk/aws-s3:grantWriteWithoutAcl": true,
"Env": "stg",
"DataBucketName": "sample.stg",
"GlueScriptLocation": "application/glue/src",
"GlueConnectionName": "hogehoge",
// ここから追記
"VpcId": "用意した vpc id",
"SubnetIds": ["用意した subnet id"],
"ServiceRoleForBatchARN": "用意した AWS Batch 実行ロールの Arn",
"JobRoleARN": "用意した AWS Batch Job の実行ロールのArn",
"JobOverrideCommand": [""],
"JobImageRepositoryName": "作成する AWS Batch のジョブイメージのリポジトリの名前",
"JobImageTagName": "Docker の Image Tag",
"JobvCPUS": 1,
"JobMemoryLimitMiB": 1024
},
"prd": {
"@aws-cdk/core:enableStackNameDuplicates": "true",
"aws-cdk:enableDiffNoFail": "true",
"@aws-cdk/core:stackRelativeExports": "true",
"@aws-cdk/aws-ecr-assets:dockerIgnoreSupport": true,
"@aws-cdk/aws-secretsmanager:parseOwnedSecretName": true,
"@aws-cdk/aws-kms:defaultKeyPolicies": true,
"@aws-cdk/aws-s3:grantWriteWithoutAcl": true,
"Env": "prd",
"DataBucketName": "sample.prd",
"GlueScriptLocation": "application/glue/src",
"GlueConnectionName": "hogehoge",
// ここから追記
"VpcId": "用意した vpc id",
"SubnetIds": ["用意した subnet id"],
"ServiceRoleForBatchARN": "用意した AWS Batch 実行ロールの Arn",
"JobRoleARN": "用意した AWS Batch Job の実行ロールのArn",
"JobOverrideCommand": [""],
"JobImageRepositoryName": "作成する AWS Batch のジョブイメージのリポジトリの名前",
"JobImageTagName": "Docker の Image Tag",
"JobvCPUS": 1,
"JobMemoryLimitMiB": 1024
},
}
}
CDK上で作成するもの
最後に今まで用意したものをCDK上でどのように記述するのかを説明します。cdk-stask.ts に AWS Batch 部分の定義を追記します。Batch Name の定義とDockerImageAsset の追加を行います。DockerImageAsset の追加では、ECRにリポジトリを作成することができます。
container-batch.ts に、そのリポジトリ名や コンソールで作成した VPC の id や Subnet の id を渡します。container-batch.ts ではそれらを利用して、AWS Batch の Job を定義します。
cdk-stack.ts
import * as cdk from '@aws-cdk/core';
import * as glue from "@aws-cdk/aws-glue";
import * as s3 from '@aws-cdk/aws-s3'
import * as iam from "@aws-cdk/aws-iam";
import * as s3Deploy from '@aws-cdk/aws-s3-deployment'
import { ContainerBatch } from "./container-batch";
export interface Context {
ENVStage: string;
DataBucketName: string,
GlueScriptLocation: string;
GlueLibraryLocation: string;
GlueConnectionName: string;
VectorOutputFile: string;
FaissIndexOutputFile: string;
ContainerImageTag: string;
VpcId: string;
SubnetIds: string[];
ServiceRoleForBatchARN: string;
JobRoleARN: string;
JobOverrideCommand: string[];
JobImageTagName: string;
JobvCPUS: number;
JobMemoryLimitMiB: number;
}
export class CdkStack extends cdk.Stack {
constructor(
scope: cdk.Construct,
id: string,
context: Context,
props?: cdk.StackProps
) {
super(scope, id, props);
// #region Glue
/*
前回の範囲のため省略
*/
// #endregion Glue
// #region AWS Batch
const batchName = `CalculateFaissIndexBatch-${context.ENVStage}`;
// Dockerイメージを作成してECRにプッシュ。このとき、リポジトリ名を指定して、リポジトリを作成する。
const imageAsset: cdk.DockerImageAssetLocation = this.synthesizer.addDockerImageAsset({
sourceHash: context.JobImageTagName,
directoryName: `../../batch`,
repositoryName: `calculate-faiss-index-batch-${context.ENVStage}`,
})
// AWS Batch の宣言するコードは ./container-batch.ts に切り出し。 parameter を渡す。
const awsBatch = new ContainerBatch(this, batchName, {
name: batchName,
vpcId: context.VpcId,
subnetIds: context.SubnetIds,
serviceRoleForBatchARN: context.ServiceRoleForBatchARN,
jobRoleARN: context.JobRoleARN,
jobOverrideCommand: context.JobOverrideCommand,
jobEnvironment: {
TZ: "Asia/Tokyo",
ENV: `${context.ENVStage}`,
DATA_BUCKET_NAME: context.DataBucketName,
VECTOR_OUTPUT_FILE: context.VectorOutputFile,
FAISS_INDEX_OUTPUT_FILE: context.FaissIndexOutputFile,
},
jobImageRepositoryName: imageAsset.repositoryName,
jobImageTagName: context.JobImageTagName,
jobvCPUS: context.JobvCPUS,
jobMemoryLimitMiB: context.JobMemoryLimitMiB,
});
// #endregion AWS Batch
}
}
container-batch.ts
import * as cdk from "@aws-cdk/core";
import * as ec2 from "@aws-cdk/aws-ec2";
import * as iam from "@aws-cdk/aws-iam";
import * as ecr from "@aws-cdk/aws-ecr";
import * as ecs from "@aws-cdk/aws-ecs";
import * as batch from "@aws-cdk/aws-batch";
export interface ContainerBatchProps {
readonly name: string;
readonly vpcId: string;
readonly subnetIds: string[];
readonly serviceRoleForBatchARN: string;
readonly jobRoleARN: string;
readonly jobOverrideCommand: string[];
readonly jobEnvironment: {
[key: string]: string;
};
readonly jobImageRepositoryName: string;
readonly jobImageTagName: string;
readonly jobvCPUS: number;
readonly jobMemoryLimitMiB: number;
}
export class ContainerBatch extends cdk.Construct {
public jobDefinitionArn: string;
public jobQueueArn: string;
constructor(scope: cdk.Construct, id: string, props: ContainerBatchProps) {
super(scope, id);
// Console で準備した VPC の を id から VPC インスタンスを作成する
const vpc = ec2.Vpc.fromLookup(this, "VPC", {
vpcId: props.vpcId,
});
// Console で準備した Subnet の を id を VPC インスタンスに紐付ける
const selectSubnets = vpc.selectSubnets({
subnets: props.subnetIds.map((subnetId) =>
ec2.Subnet.fromSubnetAttributes(this, `${subnetId}-Subnet`, {
availabilityZone: "dummy",
subnetId: subnetId,
})
),
});
// 作成した VPC 内にセキュリティグループを作成する
const securityGroup = new ec2.SecurityGroup(this, "SecurityGroup", {
vpc: vpc,
});
// Console で準備した AWS Batch 用の Role の Role Arn から、Role インスタンスを作成する
const serviceRoleForBatch = iam.Role.fromRoleArn(
this,
"ServiceRoleForBatch",
props.serviceRoleForBatchARN
);
// VPC, Subnet, セキュリティグループ, role からコンピュート環境を作成する
const computeEnvironment = new batch.ComputeEnvironment(
this,
"BatchComputeEnvironment",
{
computeEnvironmentName: `${props.name}-ComputeEnvironment`,
computeResources: {
type: batch.ComputeResourceType.ON_DEMAND,
vpc: vpc,
vpcSubnets: selectSubnets,
securityGroups: [securityGroup],
},
serviceRole: serviceRoleForBatch,
}
);
// 用意した Job の Role Arn から Batch Job の Role インスタンスを作成する
const jobRole = iam.Role.fromRoleArn(
this,
"BatchJobRole",
props.jobRoleARN
);
// コンピュート環境から Job Queue を作成する
const jobQueue = new batch.JobQueue(this, "JobQueue", {
jobQueueName: `${props.name}-JobQueue`,
computeEnvironments: [
{
computeEnvironment: computeEnvironment,
order: 1,
},
],
});
this.jobQueueArn = jobQueue.jobQueueArn
// cdk-stack.ts の50行目で作成されたリポジトリを取得する
const repository: ecr.IRepository = ecr.Repository.fromRepositoryName(
this,
`ECRRepository`,
props.jobImageRepositoryName,
)
// リポジトリから特定のイメージを取得
const image = ecs.ContainerImage.fromEcrRepository(repository, props.jobImageTagName)
// ジョブ定義を作成
const batchJobDefinition = new batch.JobDefinition(this, "JobDefinition", {
jobDefinitionName: `${props.name}-JobDefinition`,
container: {
environment: props.jobEnvironment,
image: image,
jobRole: jobRole,
vcpus: props.jobvCPUS,
memoryLimitMiB: props.jobMemoryLimitMiB,
},
});
this.jobDefinitionArn = batchJobDefinition.jobDefinitionArn;
}
}
それぞれの処理に関する説明はコメントに記載しました。省いている部分もあるので、不明点があれば Twitter などに連絡ください!
まとめ
今回は AWS CDK における AWS Batch の定義の仕方を説明しました。サンプルコード をこちらになります。
次回は Step Function で各タスクをつなぐ処理の書き方を説明していきたいと思います。
株式会社POLではエンジニア、エンジニアリングマネージャーを大募集してます!お話しだけでも構いませんのでお気軽にお声がけください!!!
https://lapras.com/job_listings/1702
https://lapras.com/job_listings/1703