見出し画像

SQLServerからBigQueryへのデータ移行実装 - Day 4

はじめに

昨日の財務分析を行う際には、会計データの取り込み・加工にて、文字化けや例外的な情報の混入を避けるなど、データの品質向上が必要なことが分かりました。特に以下の課題に対応する必要があります。

  • SQLServerの文字コード(CP932)対応

  • 勘定科目コードの標準化

  • 金額の正規化

  • 異常値の検出と処理

これらの課題を解決するため、今日はSQLServerからBigQueryへのETL(Extract/Transform/Load)処理を実装していきます。

1. 環境構築

SQLServerからBigQueryへのデータ移行に必要なパッケージをインストール

pip install pyodbc pandas google-cloud-bigquery mojimoji regex

2. データ抽出と文字コード処理

2.1 接続設定

import pyodbc
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
import mojimoji
import regex
import unicodedata
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List

# ログ設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# SQLServer接続情報
SQL_CONFIG = {
    'driver': '{SQL Server}',
    'server': 'your_server',
    'database': 'your_database',
    'uid': 'your_username',
    'pwd': 'your_password',
    'charset': 'CP932'
}

2.2 SQLServerからのデータ抽出

def get_sqlserver_connection():
"""SQLServer接続を取得"""
conn_str = (
f"DRIVER={SQL_CONFIG['driver']};"
f"SERVER={SQL_CONFIG['server']};"
f"DATABASE={SQL_CONFIG['database']};"
f"UID={SQL_CONFIG['uid']};"
f"PWD={SQL_CONFIG['pwd']};"
f"charset={SQL_CONFIG['charset']};"
)
return pyodbc.connect(conn_str)

def extract_data(start_date: datetime.date) -> pd.DataFrame:
"""データ抽出処理"""
query = """
SELECT
JH.JournalHeaderId,
JH.TransactionDate,
JH.AccountingDate,
JL.DebitAccountCode,
DA.AccountName as DebitAccountName,
JL.CreditAccountCode,
CA.AccountName as CreditAccountName,
JL.Amount,
JL.DepartmentCode,
D.DepartmentName,
JH.Description,
JH.CreatedAt,
JH.UpdatedAt
FROM
JournalHeaders JH
INNER JOIN JournalLines JL ON JH.JournalHeaderId = JL.JournalHeaderId
INNER JOIN Accounts DA ON JL.DebitAccountCode = DA.AccountCode
INNER JOIN Accounts CA ON JL.CreditAccountCode = CA.AccountCode
INNER JOIN Departments D ON JL.DepartmentCode = D.DepartmentCode
WHERE
JH.Status = 'POSTED'
AND JH.AccountingDate >= ?
-- 仮勘定や内部取引を除外
AND JL.DebitAccountCode NOT LIKE '9%'
AND JL.CreditAccountCode NOT LIKE '9%'
"""

with get_sqlserver_connection() as conn:
    df = pd.read_sql(query, conn, params=[start_date])
return df

3. データ変換処理

データ変換処理では2つの主要なクラスを実装しています。

  1. TextNormalizer クラス

    1. 文字列の正規化(normalize_text)

      • 空白除去

      • Unicode正規化(NFKC)

      • 英数字の半角変換

      • カタカナの全角変換

      • 特殊文字の除去

    2. コードの正規化(normalize_code)

      • 空白除去

      • 半角変換

      • 英数字以外の文字除去

  2. DataTransformer クラス

    • データフレーム全体の変換処理

    • 文字列カラムの正規化

    • コードカラムの正規化

    • 日付型の標準化

    • 金額の小数点処理

    • 異常値の検出(Z-scoreベース)

これらにより、SQLServerから取得したデータの一貫性と品質を確保します。

3.1 文字列正規化

class TextNormalizer:
    @staticmethod
    def normalize_text(text: Optional[str]) -> Optional[str]:
        """文字列の正規化"""
        if not text or text.strip() == '':
            return None
            
        # 前後の空白を除去
        text = text.strip()
        
        # Unicode正規化
        text = unicodedata.normalize('NFKC', text)
        
        # 英数字を半角に
        text = mojimoji.zen_to_han(text, ascii=True, digit=True)
        
        # カタカナを全角に
        text = mojimoji.han_to_zen(text, kana=True)
        
        # 特殊文字を除去
        text = regex.sub(r'[\p{C}]', '', text)
        
        return text

    @staticmethod
    def normalize_code(code: Optional[str]) -> Optional[str]:
        """コードの正規化"""
        if not code or code.strip() == '':
            return None
        code = mojimoji.zen_to_han(code.strip(), ascii=True, digit=True)
        return regex.sub(r'[^A-Za-z0-9]', '', code)

3.2 データ変換クラス

class DataTransformer:
    def __init__(self):
        self.text_normalizer = TextNormalizer()

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """データ変換の実行"""
        df = df.copy()

        # 文字列の正規化
        text_columns = ['DebitAccountName', 'CreditAccountName', 
                       'DepartmentName', 'Description']
        for col in text_columns:
            df[col] = df[col].apply(self.text_normalizer.normalize_text)

        # コードの正規化
        code_columns = ['DebitAccountCode', 'CreditAccountCode', 
                       'DepartmentCode']
        for col in code_columns:
            df[col] = df[col].apply(self.text_normalizer.normalize_code)

        # 日付とタイムスタンプの標準化
        date_columns = ['TransactionDate', 'AccountingDate']
        for col in date_columns:
            df[col] = pd.to_datetime(df[col]).dt.date

        # 金額の正規化(小数点2桁)
        df['Amount'] = df['Amount'].round(2)

        # タイムスタンプの標準化
        df['CreatedAt'] = pd.to_datetime(df['CreatedAt'])
        df['UpdatedAt'] = pd.to_datetime(df['UpdatedAt'])

        # 異常値の検出
        df['is_anomaly'] = self._detect_anomalies(df)
        
        return df

    def _detect_anomalies(self, df: pd.DataFrame) -> pd.Series:
        """異常値の検出"""
        stats = df.groupby('DebitAccountCode')['Amount'].agg(['mean', 'std'])
        
        def calc_zscore(row):
            mu = stats.loc[row['DebitAccountCode'], 'mean']
            sigma = stats.loc[row['DebitAccountCode'], 'std']
            if sigma == 0:
                return 0
            return abs((row['Amount'] - mu) / sigma)
        
        return df.apply(calc_zscore, axis=1) > 3

4. BigQueryへのロード

4.1 BigQuery設定

BIGQUERY_CONFIG = {
    'credentials_path': 'path/to/your-key.json',
    'project_id': 'your-project-id',
    'dataset': 'accounting',
    'table': 'journal_entries'
}

4.2 ロード処理

class BigQueryLoader:
    def __init__(self, config: Dict[str, str]):
        credentials = service_account.Credentials.from_service_account_file(
            config['credentials_path']
        )
        self.client = bigquery.Client(
            credentials=credentials,
            project=config['project_id']
        )
        self.table_id = f"{config['project_id']}.{config['dataset']}.{config['table']}"

    def load(self, df: pd.DataFrame) -> None:
        """BigQueryへのデータロード"""
        schema = [
            bigquery.SchemaField("JournalHeaderId", "STRING"),
            bigquery.SchemaField("TransactionDate", "DATE"),
            bigquery.SchemaField("AccountingDate", "DATE"),
            bigquery.SchemaField("DebitAccountCode", "STRING"),
            bigquery.SchemaField("DebitAccountName", "STRING"),
            bigquery.SchemaField("CreditAccountCode", "STRING"),
            bigquery.SchemaField("CreditAccountName", "STRING"),
            bigquery.SchemaField("Amount", "NUMERIC"),
            bigquery.SchemaField("DepartmentCode", "STRING"),
            bigquery.SchemaField("DepartmentName", "STRING"),
            bigquery.SchemaField("Description", "STRING"),
            bigquery.SchemaField("CreatedAt", "TIMESTAMP"),
            bigquery.SchemaField("UpdatedAt", "TIMESTAMP"),
            bigquery.SchemaField("is_anomaly", "BOOLEAN")
        ]
        
        job_config = bigquery.LoadJobConfig(
            schema=schema,
            write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
        )
        
        job = self.client.load_table_from_dataframe(
            df, self.table_id, job_config=job_config
        )
        job.result()

5. ETL実行

5.1 メイン処理

  • データ抽出:過去90日分のデータを取得

  • データ変換:TextNormalizerとDataTransformerを使用

  • 異常値の検出と集計

  • BigQueryへのロード

  • 例外処理とログ出力

def main():
    try:
        # 処理開始日の設定(3ヶ月前から)
        start_date = datetime.now().date() - timedelta(days=90)
        
        # データ抽出
        logger.info(f"Extracting data from {start_date}...")
        df = extract_data(start_date)
        logger.info(f"Extracted {len(df)} records")
        
        # データ変換
        logger.info("Transforming data...")
        transformer = DataTransformer()
        df_transformed = transformer.transform(df)
        
        # 異常値の確認
        anomaly_count = df_transformed['is_anomaly'].sum()
        logger.info(f"Detected {anomaly_count} anomalies")
        
        # BigQueryへのロード
        logger.info("Loading to BigQuery...")
        loader = BigQueryLoader(BIGQUERY_CONFIG)
        loader.load(df_transformed)
        
        logger.info("ETL process completed successfully")
        
    except Exception as e:
        logger.error(f"Error occurred: {str(e)}")
        raise

if __name__ == "__main__":
    main()

5.2 実行結果の確認

BigQueryで変換後のデータを確認

  • 文字化けチェック用SQLクエリ

    • 日本語カラムの確認

    • レコード件数の集計

  • 異常値確認用SQLクエリ

    • 検出された異常値のリスト

    • 金額順での並び替え

-- 文字化けチェック
SELECT 
    DebitAccountName,
    CreditAccountName,
    DepartmentName,
    Description,
    COUNT(*) as record_count
FROM `your-project.accounting.journal_entries`
GROUP BY 1,2,3,4
LIMIT 5;

-- 異常値の確認
SELECT 
    DebitAccountCode,
    DebitAccountName,
    Amount,
    Description
FROM `your-project.accounting.journal_entries`
WHERE is_anomaly = TRUE
ORDER BY Amount DESC;

まとめ

実装のポイント

  1. CP932文字コードへの対応

    • text = unicodedata.normalize('NFKC', text)

  2. 文字列の正規化(半角/全角、カタカナ)

    • text = mojimoji.zen_to_han(text, ascii=True, digit=True)

    • text = mojimoji.han_to_zen(text, kana=True)

  3. 異常値検出の自動化

  4. エラーハンドリングとログ管理

次のステップ

Week 2では以下を実装予定です。

  1. RAG基盤の構築

  2. LangChainの環境設定

  3. ベクトル検索の実装

#Python #ETL #SQLServer #BigQuery #AdventCalendar

いいなと思ったら応援しよう!