AWS Glue からDynamoDBに直接書き込む
はじめに
こんにちは。
株式会社POLのエンジニアの渡辺です。
7/17に、AWS Glue の PySpark ジョブから DynamoDB に直接書き込みできるようになったので、それについて、書かせていただきます。
当初は英語のドキュメントしかなかったけど、今は日本語のドキュメントもあるので、そこ読めば誰でもできるとおもうのですが、備忘録的な感じです笑
ノートブックの準備
今回は、SageMakerノートブック上で試すので、開発エンドポイントとそのエンドポイント上にアタッチするSageMaker ノートブックを準備します。
ノートブックを立ち上げて、PySparkのファイルを作成し、起動します。
ソースコード
実際のソースコードに入っていきます。今回はRDSからデータを取得し、DataFrameに変換。その後、そのデータをDynamoDBにインサートします。実際の運用だとつかうこと少ないかもですが、データ加工して、DynamoDBに保存とかが簡単にできるようになります。
まず、RDSやDynamoにつなぐのに必要なファイルのインポートと初期化を行う。connection_nameの部分はGlueで接続を作成することができ、それを利用しています。
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
import datetime
# initialization
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
src_jdbc = glueContext.extract_jdbc_conf(connection_name='connection-name')
次に、RDSからデータを取得し、DataFrameにして返す関数をつくります。urlの部分のdatabase_nameの部分はご自身のデータベースの名前に置き換えて、お使いください。
def fetch_rds_table_dataframe(tablename):
## RDS から DynmicFrame の形式で取得し、DataFrame に変換
df = glueContext.create_dynamic_frame_from_options(
connection_type=src_jdbc['vendor'],
connection_options={
"url": src_jdbc['url'] + "/database_name",
"user": src_jdbc['user'],
"password": src_jdbc['password'],
"dbtable": tablename
}
).toDF()
return df
続いて、Dynamoにinsertする関数をつくります。
def insert_dynamodb_table_dataframe(tablename, df):
## DataFrame から DynamicFrame に変換
dyf = DynamicFrame.fromDF(df, glueContext, "nested")
glueContext.write_dynamic_frame_from_options(
frame=dyf,
connection_type="dynamodb",
connection_options={
"dynamodb.output.tableName": tablename,
"dynamodb.throughput.write.percent": "1.0"
}
)
これらをノートブックで実行します。
最後に、定義した関数をつなげて、実行します。dynamo_db_table_nameの部分はご自身のDynamoDBのテーブル名に置き換えてください。
## ITEM_DATAの取得
df = fetch_rds_table_dataframe("item_data").alias("item_data")
).select(
col("item_data.id").cast("string").alias("ItemId"),
).limit(10)
## 取得したデータのインサート
insert_dynamodb_table_dataframe("dynamo_db_table_name", df)
DynamoDBの方をマネジメントコンソールから、データが入っていることが確認できました。
おわりに
特にハマりポイントとかもないと思うのですが、なにか疑問があったり、POLに興味がある方はお気軽にお声がけください!
POLではエンジニアを募集しているので、興味がある方ぜひ!!