GCP環境(GCS)→別のGCP環境(GCS)へデータ転送する方法(python)
現場での作業時の備忘録になります。
現場ではAWS環境からGCPに作業環境を移管したのですが、
思いの外、細かなところで色々詰まる経験をしたので、記事を複数に分けて備忘録として残しております。
主に自分用のメモですが、AWS・GCPはユーザも多いので同じような境遇になる人も多そうです。当記事の内容が助けになると幸いです。
今回やったこと(当記事でできること)
・他社GCP環境(Cloud Strage)から自社GCP(Cloud Strage)へデータ転送
他社さんとデータ連携をする際に、今まではお互いにAWS環境を使用してたのですが、GCPの環境に変わったことで、データ連携をする処理を新規で作成する必要がありました。
今回はそのやり方について記載していきます。
使うもの
・Python
あらかじめ用意しとくもの
・サービスアカウント
これは相手の会社さんにお願いして、相手の会社さんのGCSへアクセスできるサービスアカウントを発行してもらう必要があります。
設計内容
①相手先のサービスアカウントを使って認証
②指定されたGCSバケット内のファイル名一覧を取得
③本日日付をファイル名にもつファイルを連携対象とする
④連携対象のファイル名を一時ファイルとしてダウンロード
⑤ダウンロードした一時ファイルを自社のGCSバケット内にアップロード
⑥一時ファイルを削除
記述コード
import datetime
import tempfile
import os
from google.oauth2 import service_account
from google.cloud import storage
# 認証情報(サービスアカウント)
# service_account_keyの中身を書き換える。
# 発行してもらったサービスアカウント情報はjson形式になっており、
# これはその情報をハードコーディングしたもの。
service_account_key = {
"type":"service_account",
"project_id":"相手先の会社さんのプロジェクトID",
"private_key_id":"相手先の会社さんのプライベートキー",
"private_key":"-----BEGIN PRIVATE KEY-----\xxiasjsohdauihduiw...(めちゃくちゃ長い文字列が入ります)...hiudhaiugwugai\n-----END PRIVATE KEY-----\n",
"client_email":"mail@address.com",
"client_id":"01234567890123",
"auth_uri":"https://accounts.google.com/o/oauth2/auth",
"token_uri":"https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url":"abcdfff.iam.gserviceaccount.com"
}
credentials = service_account.Credentials.from_service_account_info(service_account_key)
scoped_credentials = credentials.with_scopes(
[
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/analytics.readonly'
])
# ダウンロード先の情報
project_name = 'sorce-project-name'
bucket_name = 'source-bucket-name'
bucket_path = 'gs://source-bucket-name/'
# アップロード先の情報
dataplatform_project_name = 'target-project-name
dataplatform_bucket_name = 'target-bucket-name'
dataplatform_bucket_path = 'gs://target-bucket-name/'
dataplatform_client = storage.Client(dataplatform_project_name)
dataplatform_bucket = dataplatform_client.get_bucket(dataplatform_bucket_name)
# 本日日付を'%Y%m%d'で取得
today = datetime.date.today()
today = today.strftime('%Y%m%d')
# ダウンロード先のGCSへアクセスしファイル名一覧を取得
gcs_client_download = storage.Client(credentials=scoped_credentials, project=project_name)
blobs = gcs_client_download.list_blobs(bucket_name)
# ダウンロード対象ファイルのtempファイル名
_, temp_local_filename = tempfile.mkstemp()
for blob in blobs:
file_name = blob.name
# 本日日付が含まれるファイル名がある場合にGCSへアップロードする
if today in file_name:
blob.download_to_filename(temp_local_filename)
bucket = dataplatform_client.bucket(dataplatform_bucket_name)
blob = bucket.blob(file_name)
blob.upload_from_filename(temp_local_filename)
os.remove(temp_local_filename)
この記事が気に入ったらサポートをしてみませんか?