AWSのDB値をAmazon S3に出力し、Snowflake側に取り込むプログラム
前提条件
AWS側
AWSには、S3拡張機能をインストールしている状態。
AWS SDKはJavaを利用しています。
AWS DBはPostgres SQLを利用しています。
S3のバケット構造は以下になっている想定で進めます。
[Amazon S3] > [バケット] > [my-bucket-name] > [WORK_DIR]
Snowflake側
"SHOW STAGES" SQLの実行結果が以下になっている想定で進めます。
$${\begin{array}{|c|c|c|c|} \hline created\_on&name&datebase\_name&schema\_name \\ \hline y-m-d & SAMPLE\_STAGE & (略)& (略) \\ \hline\end{array}}$$
$${\begin{array}{|c|c|c|} \hline url&has\_credentials& region \\ \hline s3:://my-bucket-name&N&us-east-1 \\ \hline\end{array}}$$
$${\begin{array}{|c|c|c|c|} \hline type&cloud&storage\_integration & endpoint \\ \hline EXTERNAL&AWS&SAMPLE\_BUCKET&null \\ \hline\end{array}}$$
$${\begin{array}{|c|} \hline owner \\ \hline SAMPLE\_STAGE\_DBADMIN \\ \hline\end{array}}$$
※【URLの出処】[Amazon S3] > [バケット] > [my-bucket-name] > [WORK_DIR]画面にある「S3 URI を コピー」ボタン押下時の値。各種権限を付与していること。
例)ストレージ統合(storage_integration)がnullではない場合、そのINTEGRATIONであるSAMPLE_STAGE_DBADMINロール(owner列)にUSAGE権限を付与する。
プログラム
Java側
//出力先の設定
String targetS3Path = "WORK_DIR/details.csv";
//DBの値をS3へ出力(Export)する
String targetKey = "takenoko";
String s3ExportQuery = "SELCT * FROM aws_s3.query_export_to_s3 ('SELECT * FROM user.item_table WHERE item_name=''"
+ targetKey
+ "''', 'my-bucket-name', '" + targetS3Path + "');";
//SDK実行
PreparedStatement ps = Context.getAWSConnection().prepareStatement(s3ExportQuery);
ps.clearParameters();
ps.execute();
//Snowflake側に取り込む
String copyS3BucketQuery = "COPY INTO <TABKE-NAME-IN-SNOWFLAKE> FROM @SAMPLE_STAGE/" + targetS3Path + " FILE_FORMAT=(TYPE=CSV FIRLD_DELIMITER='\\t') FORCE=TRUE";
ps = Context.getSnowflakeConnection().prepareStatement(copyS3BucketQuery);
ps.clearParameters();
ps.execute();
補足
Context.getAWSConnection()
→私の環境で作成したローカルメソッドです。AWSへアクセスすることができるお持ちのコネクションに適宜置き換えてください。Java実行時にタイムアウトエラーになる場合は、以下を冒頭に追加するとよい。
String query1 = "set statement_timeout = 5000;";
PreparedStatement pst = Connection().prepareStatement(query1);
pst.clearParameters();
pst.execure();もしS3側で通信エラーが起きる場合は、以下を試すとよい。
・Apache httpdのタイムアウト値を300秒に変更。
・idle_timeout.timeout_secondsを60秒から増やす。
参考文献
ステップ2: POLICY_ADMIN カスタムロールに権限を付与する | Snowflakeセッションおよびセッションポリシー | Snowflake Documentation
RDS for PostgreSQL DB インスタンスから Amazon S3 へのデータのエクスポート - Amazon Relational Database Service