見出し画像

【TECH連載】AWS Glue入門編

こんにちは、AICROSS開発チーム川上です。

AI CROSSではデータウェアハウスとして、Redshiftを利用しています。
Redshiftへのデータのインポートのために、AWS Glueというサービスを一部利用しています。

AWS Glueはうまく使うと非常に強力なツールになります。

本日は、当社でも活用しているAWS Glueについて、具体的にどうやって使うのか、ステップバイステップで解説してみようと思います。

なお、以下の解説で前提としている簡単な概略図は以下の図を参照ください。

AWSの簡易構成図

構築手順

  1. Redshiftの構築

  2. S3バケットを作成し、データのインポートを行う

  3. Glue Connectorsの作成を行う

  4. Glue jobsの作成を行う。

  5. 実行を行う

今回はGlueの入門なので1、2の説明は省いて3からの説明になります。

Glue Connectors

データベースの接続を行うための設定になります。
細かい部分については割愛いたします。

  1. Glueの画面を開く

  2. Data connectionsを開く

  3. Create connectionで作成画面に映る

  4. Redshiftを選択すると以下の画面が出るので必要に応じて、認証情報の入力を行う。(データベースの管理者等に情報を聞いておく)

  5. リソースの名前を記載して完了

Glue Jobs

Glueのジョブの本体
Visual ETL、NoteBook、Script editorの種類がある。
今回使用を行うのはScript editorになります。

インポートを行うデータ

test_file.csv

timestamp_column,date_column,string_column,int_column
2024/3/4 00:00:00,,a,1
2024/3/5 00:00:00,,b,2
2024/3/6 00:00:00,,c,3
2024/3/7 00:00:00,,d,4
2024/3/8 00:00:00,,e,5

単純なcsvファイル

Glue Code

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.functions import lit, col, from_unixtime, to_timestamp,regexp_replace,udf,to_date,from_utc_timestamp
from pyspark.sql.types import IntegerType,StringType

def df_show(dfy):
    dfc = DynamicFrameCollection(
        {"dfy": dfy},
        glueContext
    )
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df.printSchema()
    df.show()

# Job details Job parameters
args = getResolvedOptions(sys.argv, ["JOB_NAME","INPUT_BUCKET","TEMP_BUCKET","Y","M","D"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# loggerの追加
logger = glueContext.get_logger()

temp_bucket = args["TEMP_BUCKET"]
temp_dir = f"s3://{temp_bucket}/glue/temporary/"
redshift_tmp_dir = f"s3://{temp_bucket}/glue/temporary/Redshift/"
y = args["Y"]
m = args["M"]
d = args["D"]
table = "test_table"

input_bucket = args["INPUT_BUCKET"]
logger.info(input_bucket)
input_path = f"s3://{input_bucket}/{y}/{m}/{d}/test_file.csv"

def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    # timestamp
    df = df.withColumn("timestamp_column",to_timestamp(df["timestamp_column"], "yyyy-MM-dd HH:mm:ss"))
    df.show()
    # date
    df = df.withColumn("date_column",to_date(df["timestamp_column"]))
    # string
    df = df.withColumn("string_column", df['string_column'].cast(StringType()))
    # integer
    df = df.withColumn("int_column", df['int_column'].cast(IntegerType()))
    # show
    df.printSchema()
    df.show()

    # df > dyf
    dyf = DynamicFrame.fromDF(df, glueContext, "MyTransform")
    return DynamicFrameCollection({"CustomTransform0": dyf}, glueContext)


# Script generated for node Amazon S3
S3_Input = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": -1,
        "escaper": "\\",
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            input_path
        ],
        "recurse": True,
    },
    transformation_ctx="S3_Input",
)
df_show(S3_Input)

CustomTransform_node = MyTransform(
    glueContext,
    DynamicFrameCollection(
        {"S3_Input": S3_Input},
        glueContext
    )
)

SelectFromCollection_node = SelectFromCollection.apply(
    dfc=CustomTransform_node,
    key=list(CustomTransform_node.keys())[0],
    transformation_ctx="SelectFromCollection_node",
)

ChangeSchema_node = ApplyMapping.apply(
    frame=SelectFromCollection_node,
    mappings=[
        ("timestamp_column", "timestamp", "timestamp_column", "timestamp"),
        ("date_column", "date", "date_column", "date"),
        ("string_column","string", "string_column", "string"),
        ("int_column", "integer", "int_column", "integer")
    ]
)

df_show(ChangeSchema_node)



talble_column = """
(
    "timestamp_column" TIMESTAMP,
    "date_column" DATE,
    "string_column" VARCHAR,
    "int_column" INTEGER
)
"""

preactions = f"""
DROP TABLE {table};
CREATE TABLE IF NOT EXISTS {table}
{talble_column};
"""

postactions = f"""
BEGIN;
DELETE FROM public.{table};
DELETE FROM public.{table} USING public.{table}_temp;
INSERT INTO public.{table} SELECT * FROM public.{table}_temp;
DROP TABLE public.{table}_temp;
END;
"""

#dev-develop-connect
Redshift_Output = glueContext.write_dynamic_frame.from_options(
    frame=ChangeSchema_node,
    connection_type="redshift",
    connection_options={
        "postactions": postactions,
        "redshiftTmpDir": redshift_tmp_dir,
        "useConnectionProperties": "true",
        "dbtable": f"public.{table}_temp",
        "connectionName": "dev-develop-connect",
        "preactions": preactions,
        "aws_iam_user": "arn:aws:iam::xxxxxxxxxxxx:role/dev-redshift-role",
    },
    transformation_ctx="Redshift_Output",
)




job.commit()

変更が必要な部分

{TABLE_NAME} 任意のテーブル名に変更ください
{ConnectionName} Glue Connectorsで設定した名前に変更ください
{IamRole} 新規ロールの作成を行うか、Redshiftに割り当てされているIamRoleを使用ください(新規に最小権限を絞って割り当て推奨)

Job parametersに設定が必要

--INPUT_BUCKET 任意のバケット(リージョン)
--TEMP_BUCKET 任意のバケット(同一リージョン)
--D
 日付
--M
 月
--Y
 年

JOBの流れ

基本的なジョブの内容は以下のようになっています。
コードにコメントで記載していますので合わせてご覧ください。

  1. Glueの初期設定を読み込みする

  2. S3のデータの読み込みを行う

  3. MyTransformにデータを渡し、型の制御を行う

  4. MyTransformから返ってきた型の変換を行い次処理の準備

  5. データベースに入れる型通りにデータを作る

  6. Redshiftへの書き込みを行う

詳細については以下に記載します。

1.Glueの初期設定を読み込みする

以下の記述によりGlueのジョブの変数を読み込みすることができる。

開発中GlueのJobの画面のに以下パラメータの設定を行う。
「Job parametersに設定が必要」部分で内容を説明
job details > Advanced properties > Job parameters

args = getResolvedOptions(sys.argv, ["JOB_NAME","INPUT_BUCKET","TEMP_BUCKET","Y","M","D"])

2.S3のデータの読み込みを行う

  • S3の読み込みを行う

  • データによって、オプションを変化させる。

  • format、format_options等で、csv、jsonなどによって変更する

  • input_pathにはs3://bucket/filepathという形式で行う。

recurse: ファイルを再帰的に読み込みを行う

from awsglue.context import GlueContext

# Script generated for node Amazon S3
S3_Input = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": -1,
        "escaper": "\\",
        "withHeader": True,
        "separator": ",",
        "optimizePerformance": False,
    },
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": [
            input_path
        ],
        "recurse": True,
    },
    transformation_ctx="S3_Input",
)

3.MyTransformにデータを渡し、型の制御を行う

データを読み込みを行った際に、データの型を変更する。
to_timestamp: timestampへの変更を行う
StringType:  Stringへの変更を行う
IntegerType:  Integerへの変更を行う
行うことができる型については、 pyspark.sql.typesもしくはpyspark.sql.functionsをご確認ください。

from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import to_timestamp
# 時間
df = df.withColumn("timestamp_column",to_timestamp(df["timestamp_column"], "yyyy-MM-dd HH:mm:ss"))
# string
df = df.withColumn("string_column", df['string_column'].cast(StringType()))
# integer
df = df.withColumn("int_column", df['int_column'].cast(IntegerType()))

4.MyTransformから返って来た型の変換を行い次処理の準備

from awsglue.transforms import *
# 上記もしくは以下のimport
from awsglue.transforms import SelectFromCollection

SelectFromCollection_node = SelectFromCollection.apply(
    dfc=CustomTransform_node,
    key=list(CustomTransform_node.keys())[0],
    transformation_ctx="SelectFromCollection_node",
)

5.データベースに入れる型通りにデータを作る

データベースに入れたいデータを以下の形式で入力する。
("変更前 カラム名","変更前 型", "変更前 カラム名", "変更後 型")

from awsglue.transforms import *
# 上記もしくは以下のimport
from awsglue.transforms import ApplyMapping

ChangeSchema_node = ApplyMapping.apply(
    frame=SelectFromCollection_node,
    mappings=[
        ("timestamp_column", "timestamp", "timestamp_column", "timestamp"),
        ("date_column", "date", "date_column", "date"),
        ("string_column","string", "string_column", "string"),
        ("int_column", "integer", "int_column", "integer")
    ]
)

6.Redshiftへの書き込みを行う

talble_column: データベースに定義するカラムを設定する
preactions: データベースに書き込みを行う前処理
postactions: データベースに書き込みを行った後処理
glueContext.write_dynamic_frame.from_optionsのオプション
  dbtable: 書き込みを行うテーブル名

データの加工の順番
  1. preactions データを入れるテーブルの作成やジョブの流し直しを行う際のデータ削除を行ったりする。

  2. dbtable insert ChangeSchema_nodeのデータをdbtable で指定しているテーブルにインサートを行う

  3. postactions データインサートを行った後の処理を行う2でtempファイルにインサートした後に重複したデータを正式なテーブルに入れる。


talble_column = """
(
    "timestamp_column" TIMESTAMP,
    "date_column" DATE,
    "string_column" VARCHAR,
    "int_column" INTEGER
)
"""

"""
preactions = f"""
DROP TABLE {table};
CREATE TABLE IF NOT EXISTS {table}
{talble_column};
"""

postactions = f"""
BEGIN;
DELETE FROM public.{table};
DELETE FROM public.{table} USING public.{table}_temp;
INSERT INTO public.{table} SELECT * FROM public.{table}_temp;
DROP TABLE public.{table}_temp;
END;
"""

#dev-develop-connect
Redshift_Output = glueContext.write_dynamic_frame.from_options(
    frame=ChangeSchema_node,
    connection_type="redshift",
    connection_options={
        "postactions": postactions,
        "redshiftTmpDir": redshift_tmp_dir,
        "useConnectionProperties": "true",
        "dbtable": f"public.{table}_temp",
        "connectionName": "dev-develop-connect",
        "preactions": preactions,
        "aws_iam_user": "arn:aws:iam::xxxxxxxxxxxx:role/dev-redshift-role",
    },
    transformation_ctx="Redshift_Output",
)

Glueのメリット

1. 小さなコードでデータベースへデータを入れことができる

上記に記載している通りにファイルの探索や、データベースのコネクション等の細かいコードを書かなくてよい。

2. 分散実行を自動で行える

sparkにより複数ファイルがあった場合ファイル単位で処理を行うことができる。これがかなり強力で、パフォーマンスの高い読み込みを行うことができます。

Glueデメリット

1. 書き方が特殊で行いづらい処理がある。

for分等の処理を行うことが難しい。
以下のようなコードでforを行えるが遅くなる。

for row in df.collect():
    pass

以下のような形でも書けるが書き方のハードルが上がる。

rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())

2. データの型処理等によりデータ型が不一致を起こす

データを分散処理を行い、自動でデータフレームを作成し型を決定するため同じカラムでもデータ型が違う場合がある。
そのため、毎回データの型をcastする必要がある。
以下のような状態の場合int型、float型の両方の可能性があるためcastを行うことが必要。
場合によってはデータがロストするので注意が必要です。

df = df.withColumn("int_column", df['int_column'].cast(IntegerType()))

Glueまとめ

コードは書きやすいが、少し特殊なことをしようとすると一気にハードルが上がるのがAWS Glueだと思います。しかし、シンプルな内容を実行しようとした場合は、簡単に書くことができ、高速でデータの加工及びデータベースへのデータ投入を行うことができるので、非常にメリットがあると思います。
大量のデータを単純に加工したい方はぜひ使用してみてください。

おわりに

AI CROSSでは、次々に登場する新しい技術やフレームワークも活用しながら、Smart Work, Smart Lifeの実現に向けて、日々SaaSサービスの開発・改善に取り組んでいます。
これからもAI CROSSの活動や、利用・注目している技術の解説などを発信していこうと思います。

それでは今日はこのへんで。

この記事が気に入ったらサポートをしてみませんか?