【TECH連載】AWS Glue入門編
こんにちは、AICROSS開発チーム川上です。
AI CROSSではデータウェアハウスとして、Redshiftを利用しています。
Redshiftへのデータのインポートのために、AWS Glueというサービスを一部利用しています。
AWS Glueはうまく使うと非常に強力なツールになります。
本日は、当社でも活用しているAWS Glueについて、具体的にどうやって使うのか、ステップバイステップで解説してみようと思います。
なお、以下の解説で前提としている簡単な概略図は以下の図を参照ください。
構築手順
Redshiftの構築
S3バケットを作成し、データのインポートを行う
Glue Connectorsの作成を行う
Glue jobsの作成を行う。
実行を行う
今回はGlueの入門なので1、2の説明は省いて3からの説明になります。
Glue Connectors
データベースの接続を行うための設定になります。
細かい部分については割愛いたします。
Glueの画面を開く
Data connectionsを開く
Create connectionで作成画面に映る
Redshiftを選択すると以下の画面が出るので必要に応じて、認証情報の入力を行う。(データベースの管理者等に情報を聞いておく)
リソースの名前を記載して完了
Glue Jobs
Glueのジョブの本体
Visual ETL、NoteBook、Script editorの種類がある。
今回使用を行うのはScript editorになります。
インポートを行うデータ
timestamp_column,date_column,string_column,int_column
2024/3/4 00:00:00,,a,1
2024/3/5 00:00:00,,b,2
2024/3/6 00:00:00,,c,3
2024/3/7 00:00:00,,d,4
2024/3/8 00:00:00,,e,5
単純なcsvファイル
Glue Code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import lit, col, from_unixtime, to_timestamp,regexp_replace,udf,to_date,from_utc_timestamp
from pyspark.sql.types import IntegerType,StringType
def df_show(dfy):
dfc = DynamicFrameCollection(
{"dfy": dfy},
glueContext
)
df = dfc.select(list(dfc.keys())[0]).toDF()
df.printSchema()
df.show()
# Job details Job parameters
args = getResolvedOptions(sys.argv, ["JOB_NAME","INPUT_BUCKET","TEMP_BUCKET","Y","M","D"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# loggerの追加
logger = glueContext.get_logger()
temp_bucket = args["TEMP_BUCKET"]
temp_dir = f"s3://{temp_bucket}/glue/temporary/"
redshift_tmp_dir = f"s3://{temp_bucket}/glue/temporary/Redshift/"
y = args["Y"]
m = args["M"]
d = args["D"]
table = "test_table"
input_bucket = args["INPUT_BUCKET"]
logger.info(input_bucket)
input_path = f"s3://{input_bucket}/{y}/{m}/{d}/test_file.csv"
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0]).toDF()
# timestamp
df = df.withColumn("timestamp_column",to_timestamp(df["timestamp_column"], "yyyy-MM-dd HH:mm:ss"))
df.show()
# date
df = df.withColumn("date_column",to_date(df["timestamp_column"]))
# string
df = df.withColumn("string_column", df['string_column'].cast(StringType()))
# integer
df = df.withColumn("int_column", df['int_column'].cast(IntegerType()))
# show
df.printSchema()
df.show()
# df > dyf
dyf = DynamicFrame.fromDF(df, glueContext, "MyTransform")
return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)
# Script generated for node Amazon S3
S3_Input = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": -1,
"escaper": "\\",
"withHeader": True,
"separator": ",",
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": [
input_path
],
"recurse": True,
},
transformation_ctx="S3_Input",
)
df_show(S3_Input)
CustomTransform_node = MyTransform(
glueContext,
DynamicFrameCollection(
{"S3_Input": S3_Input},
glueContext
)
)
SelectFromCollection_node = SelectFromCollection.apply(
dfc=CustomTransform_node,
key=list(CustomTransform_node.keys())[0],
transformation_ctx="SelectFromCollection_node",
)
ChangeSchema_node = ApplyMapping.apply(
frame=SelectFromCollection_node,
mappings=[
("timestamp_column", "timestamp", "timestamp_column", "timestamp"),
("date_column", "date", "date_column", "date"),
("string_column","string", "string_column", "string"),
("int_column", "integer", "int_column", "integer")
]
)
df_show(ChangeSchema_node)
talble_column = """
(
"timestamp_column" TIMESTAMP,
"date_column" DATE,
"string_column" VARCHAR,
"int_column" INTEGER
)
"""
preactions = f"""
DROP TABLE {table};
CREATE TABLE IF NOT EXISTS {table}
{talble_column};
"""
postactions = f"""
BEGIN;
DELETE FROM public.{table};
DELETE FROM public.{table} USING public.{table}_temp;
INSERT INTO public.{table} SELECT * FROM public.{table}_temp;
DROP TABLE public.{table}_temp;
END;
"""
#dev-develop-connect
Redshift_Output = glueContext.write_dynamic_frame.from_options(
frame=ChangeSchema_node,
connection_type="redshift",
connection_options={
"postactions": postactions,
"redshiftTmpDir": redshift_tmp_dir,
"useConnectionProperties": "true",
"dbtable": f"public.{table}_temp",
"connectionName": "dev-develop-connect",
"preactions": preactions,
"aws_iam_user": "arn:aws:iam::xxxxxxxxxxxx:role/dev-redshift-role",
},
transformation_ctx="Redshift_Output",
)
job.commit()
変更が必要な部分
{TABLE_NAME} 任意のテーブル名に変更ください
{ConnectionName} Glue Connectorsで設定した名前に変更ください
{IamRole} 新規ロールの作成を行うか、Redshiftに割り当てされているIamRoleを使用ください(新規に最小権限を絞って割り当て推奨)
Job parametersに設定が必要
--INPUT_BUCKET 任意のバケット(リージョン)
--TEMP_BUCKET 任意のバケット(同一リージョン)
--D 日付
--M 月
--Y 年
JOBの流れ
基本的なジョブの内容は以下のようになっています。
コードにコメントで記載していますので合わせてご覧ください。
Glueの初期設定を読み込みする
S3のデータの読み込みを行う
MyTransformにデータを渡し、型の制御を行う
MyTransformから返ってきた型の変換を行い次処理の準備
データベースに入れる型通りにデータを作る
Redshiftへの書き込みを行う
詳細については以下に記載します。
1.Glueの初期設定を読み込みする
以下の記述によりGlueのジョブの変数を読み込みすることができる。
開発中GlueのJobの画面のに以下パラメータの設定を行う。
「Job parametersに設定が必要」部分で内容を説明
job details > Advanced properties > Job parameters
args = getResolvedOptions(sys.argv, ["JOB_NAME","INPUT_BUCKET","TEMP_BUCKET","Y","M","D"])
2.S3のデータの読み込みを行う
S3の読み込みを行う
データによって、オプションを変化させる。
format、format_options等で、csv、jsonなどによって変更する
input_pathにはs3://bucket/filepathという形式で行う。
recurse: ファイルを再帰的に読み込みを行う
from awsglue.context import GlueContext
# Script generated for node Amazon S3
S3_Input = glueContext.create_dynamic_frame.from_options(
format_options={
"quoteChar": -1,
"escaper": "\\",
"withHeader": True,
"separator": ",",
"optimizePerformance": False,
},
connection_type="s3",
format="csv",
connection_options={
"paths": [
input_path
],
"recurse": True,
},
transformation_ctx="S3_Input",
)
3.MyTransformにデータを渡し、型の制御を行う
データを読み込みを行った際に、データの型を変更する。
to_timestamp: timestampへの変更を行う
StringType: Stringへの変更を行う
IntegerType: Integerへの変更を行う
行うことができる型については、 pyspark.sql.typesもしくはpyspark.sql.functionsをご確認ください。
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import to_timestamp
# 時間
df = df.withColumn("timestamp_column",to_timestamp(df["timestamp_column"], "yyyy-MM-dd HH:mm:ss"))
# string
df = df.withColumn("string_column", df['string_column'].cast(StringType()))
# integer
df = df.withColumn("int_column", df['int_column'].cast(IntegerType()))
4.MyTransformから返って来た型の変換を行い次処理の準備
from awsglue.transforms import *
# 上記もしくは以下のimport
from awsglue.transforms import SelectFromCollection
SelectFromCollection_node = SelectFromCollection.apply(
dfc=CustomTransform_node,
key=list(CustomTransform_node.keys())[0],
transformation_ctx="SelectFromCollection_node",
)
5.データベースに入れる型通りにデータを作る
データベースに入れたいデータを以下の形式で入力する。
("変更前 カラム名","変更前 型", "変更前 カラム名", "変更後 型")
from awsglue.transforms import *
# 上記もしくは以下のimport
from awsglue.transforms import ApplyMapping
ChangeSchema_node = ApplyMapping.apply(
frame=SelectFromCollection_node,
mappings=[
("timestamp_column", "timestamp", "timestamp_column", "timestamp"),
("date_column", "date", "date_column", "date"),
("string_column","string", "string_column", "string"),
("int_column", "integer", "int_column", "integer")
]
)
6.Redshiftへの書き込みを行う
talble_column: データベースに定義するカラムを設定する
preactions: データベースに書き込みを行う前処理
postactions: データベースに書き込みを行った後処理
glueContext.write_dynamic_frame.from_optionsのオプション
dbtable: 書き込みを行うテーブル名
preactions データを入れるテーブルの作成やジョブの流し直しを行う際のデータ削除を行ったりする。
dbtable insert ChangeSchema_nodeのデータをdbtable で指定しているテーブルにインサートを行う
postactions データインサートを行った後の処理を行う2でtempファイルにインサートした後に重複したデータを正式なテーブルに入れる。
talble_column = """
(
"timestamp_column" TIMESTAMP,
"date_column" DATE,
"string_column" VARCHAR,
"int_column" INTEGER
)
"""
"""
preactions = f"""
DROP TABLE {table};
CREATE TABLE IF NOT EXISTS {table}
{talble_column};
"""
postactions = f"""
BEGIN;
DELETE FROM public.{table};
DELETE FROM public.{table} USING public.{table}_temp;
INSERT INTO public.{table} SELECT * FROM public.{table}_temp;
DROP TABLE public.{table}_temp;
END;
"""
#dev-develop-connect
Redshift_Output = glueContext.write_dynamic_frame.from_options(
frame=ChangeSchema_node,
connection_type="redshift",
connection_options={
"postactions": postactions,
"redshiftTmpDir": redshift_tmp_dir,
"useConnectionProperties": "true",
"dbtable": f"public.{table}_temp",
"connectionName": "dev-develop-connect",
"preactions": preactions,
"aws_iam_user": "arn:aws:iam::xxxxxxxxxxxx:role/dev-redshift-role",
},
transformation_ctx="Redshift_Output",
)
Glueのメリット
1. 小さなコードでデータベースへデータを入れことができる
上記に記載している通りにファイルの探索や、データベースのコネクション等の細かいコードを書かなくてよい。
2. 分散実行を自動で行える
sparkにより複数ファイルがあった場合ファイル単位で処理を行うことができる。これがかなり強力で、パフォーマンスの高い読み込みを行うことができます。
Glueデメリット
1. 書き方が特殊で行いづらい処理がある。
for分等の処理を行うことが難しい。
以下のようなコードでforを行えるが遅くなる。
for row in df.collect():
pass
以下のような形でも書けるが書き方のハードルが上がる。
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())
2. データの型処理等によりデータ型が不一致を起こす
データを分散処理を行い、自動でデータフレームを作成し型を決定するため同じカラムでもデータ型が違う場合がある。
そのため、毎回データの型をcastする必要がある。
以下のような状態の場合int型、float型の両方の可能性があるためcastを行うことが必要。
場合によってはデータがロストするので注意が必要です。
df = df.withColumn("int_column", df['int_column'].cast(IntegerType()))
Glueまとめ
コードは書きやすいが、少し特殊なことをしようとすると一気にハードルが上がるのがAWS Glueだと思います。しかし、シンプルな内容を実行しようとした場合は、簡単に書くことができ、高速でデータの加工及びデータベースへのデータ投入を行うことができるので、非常にメリットがあると思います。
大量のデータを単純に加工したい方はぜひ使用してみてください。
おわりに
AI CROSSでは、次々に登場する新しい技術やフレームワークも活用しながら、Smart Work, Smart Lifeの実現に向けて、日々SaaSサービスの開発・改善に取り組んでいます。
これからもAI CROSSの活動や、利用・注目している技術の解説などを発信していこうと思います。
それでは今日はこのへんで。
この記事が気に入ったらサポートをしてみませんか?