見出し画像

Minio + DuckDB + dbt + Airflowでローカルにデータ基盤を立ててみる

かなり久しぶりの投稿になりました
ここ3年ちょっとほどデータ系の分野が楽しく、データエンジニアをやっています
何年データエンジニアとしてやっていくかはまだ未知数ですがしばらくはこちらの道でやっていきたいと思っています

今日はミニデータ基盤としてMinio + DuckDB + dbt + Airflowでかなり楽にローカルにデータ基盤を立てられるよという紹介をしたいと思います

リポジトリ

こちらとなります
(まだ足りていない部分が割とありますが…)

使用するツールのざっくりとした紹介

  • Minio

    • オブジェクトストレージサーバーで、Amazon S3と互換性がある

    • ローカル環境でも簡単にセットアップできる

  • DuckDB

    • 組み込み型SQLデータベースエンジン

    • ポータブルでインストールが簡単

    • クエリ処理性能が高速

    • 分析用途に適している

  • dbt (Data Build Tool)

    • データ変換ツール

    • SQLとJinjaテンプレートを利用してデータモデルを構築

    • テスト、ドキュメント生成、実行ログ機能など付加価値の高い機能を提供

  • Airflow

    • データ処理フローを管理するワークフローマネジメントシステム

    • タスクの並列実行、モニタリング、スケジューリングが可能

    • Python製なので拡張性が高い

環境構築手順

環境のセットアップは全てdocker composeで行います
手順はREADMEに書いた通りとなりますが、単純にdocker compose up -dを行うだけです
docker-compose.yamlはAirflowの公式で取得できる雛形を少しいじったものとなります

取得した雛形に追加したサービスは以下の二つです

  • minio: minioのサーバーを立ち上げるサービスとなります

  • mc: 起動したminioのサーバーに対してバケットを作成したり、動作確認用のオブジェクトを格納したりしています

サンプルデータの取り込みとデータ変換

データの流れ

  1. サンプルデータはdocker compose up時にすでにMinioのバケットに投入されていることを前提とする

  2. dbtに作成したモデル(データを変換するためのSQL)により、DuckDB上にテーブルが作成される(staging層)

  3. DuckDB上にテーブルが作成されるとき、参照するデータの実態はMinioに存在している
    ※今回のリポジトリではcsvファイルとしていますが、ここはtsvでもjsonlでもparquetでもOK(ファイルによってread関数が変わる)

  4. 2で作成したテーブルを加工、集計してさらに新しいテーブルを作成する(mart層)

  5. (このリポジトリで実装はしていないが)BIツールなどからmart層のテーブルへ参照が発生し、可視化が行われる

という流れになります

dbtによるデータ変換について

まず、dbt関連の資材は`{root}/workflow/dbt`に格納しています
dbtディレクトリ配下の構成は以下のようになっています

# ディレクトリ構成
dbt/
  models/
    dbt/
      staging/
      mart/
  dbt_project.yml

(深い考えがないまま作成してしまいましたが、modelsの配下のdbtは要らなかったかも)
staging層となるSQLはstagingディレクトリに、mart層となるSQLはmartディレクトリに格納しています

stagingの一例としてstg_orders.sqlについて少し説明します

{{ config(materialized='incremental', unique_key='order_id') }}

WITH renamed AS (
    SELECT
        CAST(id AS VARCHAR) AS order_id,
        CAST(user_id AS VARCHAR) AS customer_id,
        CAST(order_date AS DATE) AS order_date,
        CAST(status AS VARCHAR) AS status,
        CAST(year AS VARCHAR) AS order_year,
        CAST(month AS VARCHAR) AS order_month
    FROM
        {{ source('external_source', 'stg_orders') }}
    {% if var('target_year', '') != '' and var('target_month', '') != '' %}
        WHERE
            year = '{{ var('target_year') }}'
            AND month = '{{ var('target_month') }}'
    {% endif %}
)

SELECT * FROM renamed

SQLのスタイルについては基本的にdbtの提唱するstyle guideに乗っかっています
上記のSQLは

  • {{ config(materialized='incremental', unique_key='order_id') }}

    • このモデルはフルリフレッシュでなく、インクリメンタルに構築することを指定

    • order_idが一意キーとして使用される

  • WITH renamed AS (...)

    • ソースデータから列名を変更し、適切なデータ型にキャストするための一時テーブル

  • CAST(...)関数

    • 元のデータ型を目的の型に変換する

  • {{ source('external_source', 'stg_orders') }}

    • external_sourceソースからstg_ordersテーブルを参照

    • external_sourceは同階層に置いているorders.ymlを参照

  • {% if var('target_year', '') != '' and var('target_month', '') != '' %}

    • dbtの変数target_year、target_monthが設定されている場合のみ実行

  • WHERE年 = '{{ var('target_year') }}' AND 月 = '{{ var('target_month') }}'

    • 変数の値に基づいてフィルタリング条件を動的に設定

    • MinioへデータをHIve形式のパーティションで分割して置いているので、パーティションプルーニング効果=不要なデータを読み込まないことによるクエリの高速化を期待している

  • SELECT * FROM renamed

    • 変換された列を含む一時テーブルからすべての列を選択

という意味となります
staging層なのであくまでも最低限の加工を意識したものとなります

TIPS: staging層とmart層について

何の説明もなくstagingとmartという言葉を出しましたが、以下のような意味合いで使用しています

staging層

  • ソースデータを取り込んだ生データを、最低限の加工だけを施したデータ

  • 例えばCSVデータをロードしてカラム名を適切に変更する、データ型を合わせるなど

  • 複雑な集計や加工は行わない

mart層

  • staging層のデータから、さらに集計や加工を施して分析用のデータセットを作成

  • factテーブルやdimensionテーブルなどが含まれる(参考:スタースキーマ

  • データマートを構築するイメージ

  • データ分析の目的に合わせて、フラットでクエリしやすい形式に変換される

データ処理フローの自動化

ここまでで、DuckDBにサンプルデータを取り込み、dbtを使ってデータモデルを構築し、クエリ可能な形で分析用データセットを作成できます
しかし、これらの一連の処理を手動で実行するのは効率が悪くデータパイプラインとして自動化する必要があります
この「データパイプラインの自動化」にAirflowを使用しています

Airflow関連の資材は`{root}/workflow/airflow`に格納しています
airflowディレクトリ配下は以下のような構成になっています

airflow/
  config/
    airflow.cfg
  dags/
    run_dbt.py
  plugins/
  Dockerfile

airflow.cfgの中身は

[core]
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository. This path must be absolute.
dags_folder = /opt/airflow/dags

だけとなっています
ここでは/opt/airflow/dagsをDAGファイルの格納ディレクトリと指定していて、この配下に新しいDAGファイルを追加、あるいは既存DAGファイルの更新を行うとworkflowにもそれらの変更が反映される仕組みとなります

よって、dagsディレクトリは/opt/airflow/dagsと共有され(docker composeのvolume設定により、ホストとコンテナ間で共有しています)配下のrun_dbt.pyを更新するとその設定が数秒でworkflowに反映されます

run_dbt.pyの中身は割と簡単なことしか行っていないので解説は避けますが、やっていることは

  1. workflowの実行パラメータとして対象日付が渡されるためそれを年月に整形

  2. 1で整形した対象年月をdbtのvarsとして渡しながらdbt runを実行

  3. データの確認としてselect * from tableを行い、結果をログに出力

dagのgraph

となります

まとめ

簡単にはなりますが、以上がMinio + DuckDB + dbt + Airflowでローカルにデータ基盤を立てる解説となります

  • Minio、DuckDB、dbt、Airflowを使ったローカル環境でのデータ基盤を構築

  • Minioでオブジェクトストレージを実現し、DuckDBにデータを取り込み

  • dbtでデータモデルを宣言的に定義し、SQLによるデータ加工を実施

  • Airflowを導入してデータパイプラインを自動化、スケジューリング・モニタリング機能を実現

ここら辺がサクッと実現できてしまうのに加えて数十GBレベルのデータであっても簡単に処理できてしまうらしいので、DWH製品を導入するほどでもないけどデータ分析基盤が欲しいなという時にかなり便利な構成なのかなと思いました
また、今回はやっていませんが

  • dbt testの追加

    • データ型やNULL制約など、データ品質を確保するためのテストを実装

  • dbt-osmosisによるメタデータ生成の自動化

    • データモデル、データソース、データ検証結果などのメタデータをドキュメント化

    • データガバナンスと理解促進を目的に活用

という要素を追加すると本格的なデータ基盤になりうるのでここら辺も時間ができたら紹介したいなと思っております

いいなと思ったら応援しよう!