DBマイグレーションツールAlembicとBigQueryを連携して、マイグレーションを管理する
AlembicとBigQueryを連携できると聞いたのですが、ドキュメントやチュートリアルを見つけられなかったので、試してみた備忘録です
ChatGPTさんと一緒に作業したらあっちこっちにいろいろなことが発生して思ったよりも時間がかかったので、参考になれば幸いです
(想定読者: マイグレーションツールの経験の浅い初心者)
TL;DR
AlembicはPythonで使えるマイグレーションツールだよ
AlembicはMySQLやpostgressqlなどの伝統的なDBだけでなく、BigQueryもサポートしているよ
BigQueryと接続するときは事前に環境設定が必要だよ
Alembic initで必要なファイルを書き出してくれるけど、結構な量の編集をしないといけないよ
ChatGPTさんはそれっぽいsuggetionをしてくれたけど、今回についてはいろいろ間違ってくれたよ…(自社サービスじゃないのにある程度コメントが正解していることにはある種の感動をしています。そんなにネガティブな感情は持っていないです)
Alembicでできること
ざっくりいうと、データベースのスキーマ変更の履歴をいい感じに管理してくれるのがマイグレーションツールです
Alembicは特にSQLAlchemyと一緒に使うことが想定されたPythonベースのマイグレーションツールになっています
マイグレーションツールを使ってスキーマ変更の履歴を管理しておくと、次ができます
・スキーマ変更(DBのカラムの変更)の順番に対する履歴の保存
・それらの順番に沿った変更の適用や変更のロールバック
特記事項としてMySQLやpostgressql, oracleなどのRDBだけでなく、BigQueryやGoogle Sprehadsheetsなどもサポートしています
(サポートしているDBのリスト)
作業の流れ
ざっくり、①初期化して必要なファイルを生成し、②BigQuery側で少し作業を行い、③credential情報などを定義し、④実際にテーブル定義&マイグレーションの実効…という感じです
`alembic init alembic`で初期化
BigQueryで環境整備
Credential情報などをPythonスクリプトに反映
テーブル定義・マイグレーション
インストール
ざっくり、次をpyproject.tomlに書いて作った環境を使っています
python = "^3.11"
Flask = "^3.0.1"
google-cloud-bigquery = "^3.16.0"
google = "^3.0.0"
flask-sqlalchemy = "^3.1.1"
flask-migrate = "^4.0.7"
SQLAlchemy = "^2.0.30"
sqlalchemy-bigquery = "^1.11.0"
alembic = "^1.13.1"
psycopg2-binary = "^2.9.9"
google-cloud-bigquery-storage = "^2.25.0"
最終的に次のようなディレクトリ構造になります
(作業ディレクトリ)
├── pyproject.toml
├── alembic.ini
└── (alembic init 出力ディレクトリ)
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
└── db
└── models
└── model.py
alembic init
まず、次のコマンドをシェル上で実行して、必要なファイルを生成します
(以下、シェルでの作業は$をつけています):
$ alembic init (出力ディレクトリ名)
出力ディレクトリをalembicにすると、次のような階層でファイル・ディレクトリが出力されます
(作業ディレクトリ)
├── pyproject.toml
├── alembic.ini
└── (出力ディレクトリ)
├── README
├── env.py
├── script.py.mako
└── versions
alembic.ini
TOMLフォーマットのalembicの初期設定ファイルです
loggerの設定とかもしてくれています
env.py
後でマイグレーションを行っていく際の環境設定などを行います
script.py.mako
後ほどマイグレーションの内容を定義するファイルを生成するのですが、それらファイルのテンプレートがこちらに保存されています
version
マイグレーションの内容を定義するファイル群はこちらのディレクトリに保存されます
BigQuery側の準備
ざっくり必要なのは次の2つです
サービスアカウントの準備
alembicで管理するデータセットの定義
サービスアカウントの準備
後ほどalembicがテーブルを自動で生成していくため、テーブルの自動生成ができる権限を持つサービスアカウントを付与する必要があります
わかりやすい記事が他にたくさんある(し、公式ドキュメントもわかりやすい)ので、今回は省略します
今回はサービスアカウントに紐づくjsonファイルのCredentialを後で使います
忘れずに発行・保存しておいてください
データセットの定義
alembicではmigrationファイル(upgrade/downgradeを定義したファイル)それぞれにversion IDを割り振っていて、最初のテーブル定義のタイミングでversion IDを管理するテーブルを自動で生成します
BigQueryは(Project).(dataset).(table)の形式でテーブルを定義します。
このとき、tableを定義するためにはdatasetが生成されている必要がありますそのため、この時点でデータセットを定義しておく必要があります
筆者はalembicという名前のデータセットをプロジェクトの下に定義しました
BigQueryはこんな感じになっています
プロジェクト
├── alembic
├── (他のデータセット)
コーディング
alembic initで生成されたファイル・ディレクトリの他にdbというディレクトリをつけました
今はこんな感じにしていきます
(作業ディレクトリ)
├── pyproject.toml
├── alembic.ini
└── (alembic init 出力ディレクトリ)
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
└── db
└── models
└── model.py
alembic.ini
alembic.iniをエディタで開いて次の情報をつけ足します
[alembic]の項目は自動生成されているはずなので、すでに定義されているものの下につけ足してください
[alembic]
script_location = (alembic initで指定したディレクトリ)
sqlalchemy.url = bigquery://(bigqueryのプロジェクト名)
[bigquery]
project_id = (bigqueryのプロジェクト名)
credentials = (サービスアカウントのcredential.json)
model.py
先にDBのモデルを定義してしまいましょう
こんな感じに定義しました
補足: crendential_pathをここで呼ぶのはイケてないので後日修正します
from datetime import datetime
from sqlalchemy import create_engine, Column, String, Integer, Unicode, DateTime
from sqlalchemy.ext.declarative import declarative_base
# Engine の作成
Engine = create_engine(
'bigquery://(project名)/(データセット名)',
credentials_path='/path/to/Credential.json'
)
ModelBase = declarative_base()
env.py
個人的に一番てこづったのがここでした
結果的には次でうまくいきました
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy_bigquery import BigQueryDialect
from google.cloud import bigquery
from google.oauth2 import service_account
from alembic import context
from db.models.model import ModelBase, Engine
#環境変数の定義: BigQueryとの接続の際、どうも環境変数でも定義しておかないとうまく接続されない
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/path/to/Credential.json"
#from migr.db import base # モデルクラスの読み込み
#target_metadata = base.Base.metadata
# iniファイルで定義したini情報の読み込み
config = context.config
# models.pyで定義したDBのmetadataを宣言
target_metadata = ModelBase.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode. """
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
#models.pyで定義したEngineをつなぐ
connectable = Engine
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
変更をコミットする
ここまで書けたら次を実行します
commentsは"initial migration"とでもなんでもしてください
$ alembic revision --autogenerate -m (comments)
そうすると、次の変化が起こります:
versionsの下に(revisionId)_(comments).pyというファイルが生成される
BigQueryの接続したテーブルの下に`alembic_version`というテーブルが生成され、version_numとしてrevisionIdが記録される
生成されたca938f69efdd_initial_migration.pyはこんな感じの内容です
"""Initial migration
Revision ID: ca938f69efdd
Revises:
Create Date: 2024-05-12 16:45:06.310399
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'ca938f69efdd'
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
revisionを適用するのには次のようなコマンドを使います
一番最初のコミットから今に至るすべてのマイグレーションの適用時:
$ alembic upgrade head
最後のマイグレーションの適用だけをしたいとき:
$ alembic upgrade +1
テーブルの定義
では、実際にマイグレーションの管理機能を使ってみましょう
models.pyを次のように変更します
AcountModelというテーブルを定義しました
from datetime import datetime
from sqlalchemy import create_engine, Column, String, Integer, Unicode, DateTime
from sqlalchemy.ext.declarative import declarative_base
# Engine の作成
Engine = create_engine(
'bigquery://(個々のURL)/alembic_version',
credentials_path='/mnt/c/python/SQLAlchemy/t-gateway-373112-f42035e8c112(power).json'
)
ModelBase = declarative_base()
class AcountModel(ModelBase):
"""
AcountModel
"""
__tablename__ = 'account'
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
description = Column(Unicode(200))
created_at = Column(DateTime, default=datetime.now, nullable=False)
updated_at = Column(DateTime, default=datetime.now, nullable=False)
Alembicはこのmodels.pyで定義されたテーブルのメタデータの変更を自動で検知して、マイグレーション定義ファイルを新たに生成します
そしてまた、revisionします
$ alembic revision --autogenerate -m (comments)
そうすると、次のrevisionが生成されます
"""Table define
Revision ID: c344aeaf06fd
Revises: ca938f69efdd
Create Date: 2024-05-12 16:46:23.452817
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'c344aeaf06fd'
down_revision: Union[str, None] = 'ca938f69efdd'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('account',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('description', sa.Unicode(length=200), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=False),
sa.Column('updated_at', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('account')
# ### end Alembic commands ###
こうなればあとはalembic upgradeを適用してください
$ alembic upgrade +1
次のような空テーブルが生成されていれば成功です
補足: どこで躓いたか
大きく次の二つだと思います
alembicが正常動作するときにどうなるかが良くわかっていなかった
とはいうものの、alembic関連のブログを読めばここはわかるのでさして困っていない
BigQueryとの接続周辺部
困ったのは完全にここ。できてしまえばそんなものかくらいなのですが、alembicが正常動作すると何をし始めるのかが良くわかっていなかったので、データセットを先に作っておいてあげるなどができていませんでした