![見出し画像](https://assets.st-note.com/production/uploads/images/164462346/rectangle_large_type_2_be1f2d941c05a9e57714c26e98e5ff20.png?width=1200)
SQLServerからBigQueryへのデータ移行実装 - Day 4
はじめに
昨日の財務分析を行う際には、会計データの取り込み・加工にて、文字化けや例外的な情報の混入を避けるなど、データの品質向上が必要なことが分かりました。特に以下の課題に対応する必要があります。
SQLServerの文字コード(CP932)対応
勘定科目コードの標準化
金額の正規化
異常値の検出と処理
これらの課題を解決するため、今日はSQLServerからBigQueryへのETL(Extract/Transform/Load)処理を実装していきます。
1. 環境構築
SQLServerからBigQueryへのデータ移行に必要なパッケージをインストール
pip install pyodbc pandas google-cloud-bigquery mojimoji regex
2. データ抽出と文字コード処理
2.1 接続設定
import pyodbc
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
import mojimoji
import regex
import unicodedata
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, List
# ログ設定
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# SQLServer接続情報
SQL_CONFIG = {
'driver': '{SQL Server}',
'server': 'your_server',
'database': 'your_database',
'uid': 'your_username',
'pwd': 'your_password',
'charset': 'CP932'
}
2.2 SQLServerからのデータ抽出
def get_sqlserver_connection():
"""SQLServer接続を取得"""
conn_str = (
f"DRIVER={SQL_CONFIG['driver']};"
f"SERVER={SQL_CONFIG['server']};"
f"DATABASE={SQL_CONFIG['database']};"
f"UID={SQL_CONFIG['uid']};"
f"PWD={SQL_CONFIG['pwd']};"
f"charset={SQL_CONFIG['charset']};"
)
return pyodbc.connect(conn_str)
def extract_data(start_date: datetime.date) -> pd.DataFrame:
"""データ抽出処理"""
query = """
SELECT
JH.JournalHeaderId,
JH.TransactionDate,
JH.AccountingDate,
JL.DebitAccountCode,
DA.AccountName as DebitAccountName,
JL.CreditAccountCode,
CA.AccountName as CreditAccountName,
JL.Amount,
JL.DepartmentCode,
D.DepartmentName,
JH.Description,
JH.CreatedAt,
JH.UpdatedAt
FROM
JournalHeaders JH
INNER JOIN JournalLines JL ON JH.JournalHeaderId = JL.JournalHeaderId
INNER JOIN Accounts DA ON JL.DebitAccountCode = DA.AccountCode
INNER JOIN Accounts CA ON JL.CreditAccountCode = CA.AccountCode
INNER JOIN Departments D ON JL.DepartmentCode = D.DepartmentCode
WHERE
JH.Status = 'POSTED'
AND JH.AccountingDate >= ?
-- 仮勘定や内部取引を除外
AND JL.DebitAccountCode NOT LIKE '9%'
AND JL.CreditAccountCode NOT LIKE '9%'
"""
with get_sqlserver_connection() as conn:
df = pd.read_sql(query, conn, params=[start_date])
return df
3. データ変換処理
データ変換処理では2つの主要なクラスを実装しています。
TextNormalizer クラス
文字列の正規化(normalize_text)
空白除去
Unicode正規化(NFKC)
英数字の半角変換
カタカナの全角変換
特殊文字の除去
コードの正規化(normalize_code)
空白除去
半角変換
英数字以外の文字除去
DataTransformer クラス
データフレーム全体の変換処理
文字列カラムの正規化
コードカラムの正規化
日付型の標準化
金額の小数点処理
異常値の検出(Z-scoreベース)
これらにより、SQLServerから取得したデータの一貫性と品質を確保します。
3.1 文字列正規化
class TextNormalizer:
@staticmethod
def normalize_text(text: Optional[str]) -> Optional[str]:
"""文字列の正規化"""
if not text or text.strip() == '':
return None
# 前後の空白を除去
text = text.strip()
# Unicode正規化
text = unicodedata.normalize('NFKC', text)
# 英数字を半角に
text = mojimoji.zen_to_han(text, ascii=True, digit=True)
# カタカナを全角に
text = mojimoji.han_to_zen(text, kana=True)
# 特殊文字を除去
text = regex.sub(r'[\p{C}]', '', text)
return text
@staticmethod
def normalize_code(code: Optional[str]) -> Optional[str]:
"""コードの正規化"""
if not code or code.strip() == '':
return None
code = mojimoji.zen_to_han(code.strip(), ascii=True, digit=True)
return regex.sub(r'[^A-Za-z0-9]', '', code)
3.2 データ変換クラス
class DataTransformer:
def __init__(self):
self.text_normalizer = TextNormalizer()
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""データ変換の実行"""
df = df.copy()
# 文字列の正規化
text_columns = ['DebitAccountName', 'CreditAccountName',
'DepartmentName', 'Description']
for col in text_columns:
df[col] = df[col].apply(self.text_normalizer.normalize_text)
# コードの正規化
code_columns = ['DebitAccountCode', 'CreditAccountCode',
'DepartmentCode']
for col in code_columns:
df[col] = df[col].apply(self.text_normalizer.normalize_code)
# 日付とタイムスタンプの標準化
date_columns = ['TransactionDate', 'AccountingDate']
for col in date_columns:
df[col] = pd.to_datetime(df[col]).dt.date
# 金額の正規化(小数点2桁)
df['Amount'] = df['Amount'].round(2)
# タイムスタンプの標準化
df['CreatedAt'] = pd.to_datetime(df['CreatedAt'])
df['UpdatedAt'] = pd.to_datetime(df['UpdatedAt'])
# 異常値の検出
df['is_anomaly'] = self._detect_anomalies(df)
return df
def _detect_anomalies(self, df: pd.DataFrame) -> pd.Series:
"""異常値の検出"""
stats = df.groupby('DebitAccountCode')['Amount'].agg(['mean', 'std'])
def calc_zscore(row):
mu = stats.loc[row['DebitAccountCode'], 'mean']
sigma = stats.loc[row['DebitAccountCode'], 'std']
if sigma == 0:
return 0
return abs((row['Amount'] - mu) / sigma)
return df.apply(calc_zscore, axis=1) > 3
4. BigQueryへのロード
4.1 BigQuery設定
BIGQUERY_CONFIG = {
'credentials_path': 'path/to/your-key.json',
'project_id': 'your-project-id',
'dataset': 'accounting',
'table': 'journal_entries'
}
4.2 ロード処理
class BigQueryLoader:
def __init__(self, config: Dict[str, str]):
credentials = service_account.Credentials.from_service_account_file(
config['credentials_path']
)
self.client = bigquery.Client(
credentials=credentials,
project=config['project_id']
)
self.table_id = f"{config['project_id']}.{config['dataset']}.{config['table']}"
def load(self, df: pd.DataFrame) -> None:
"""BigQueryへのデータロード"""
schema = [
bigquery.SchemaField("JournalHeaderId", "STRING"),
bigquery.SchemaField("TransactionDate", "DATE"),
bigquery.SchemaField("AccountingDate", "DATE"),
bigquery.SchemaField("DebitAccountCode", "STRING"),
bigquery.SchemaField("DebitAccountName", "STRING"),
bigquery.SchemaField("CreditAccountCode", "STRING"),
bigquery.SchemaField("CreditAccountName", "STRING"),
bigquery.SchemaField("Amount", "NUMERIC"),
bigquery.SchemaField("DepartmentCode", "STRING"),
bigquery.SchemaField("DepartmentName", "STRING"),
bigquery.SchemaField("Description", "STRING"),
bigquery.SchemaField("CreatedAt", "TIMESTAMP"),
bigquery.SchemaField("UpdatedAt", "TIMESTAMP"),
bigquery.SchemaField("is_anomaly", "BOOLEAN")
]
job_config = bigquery.LoadJobConfig(
schema=schema,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
job = self.client.load_table_from_dataframe(
df, self.table_id, job_config=job_config
)
job.result()
5. ETL実行
5.1 メイン処理
データ抽出:過去90日分のデータを取得
データ変換:TextNormalizerとDataTransformerを使用
異常値の検出と集計
BigQueryへのロード
例外処理とログ出力
def main():
try:
# 処理開始日の設定(3ヶ月前から)
start_date = datetime.now().date() - timedelta(days=90)
# データ抽出
logger.info(f"Extracting data from {start_date}...")
df = extract_data(start_date)
logger.info(f"Extracted {len(df)} records")
# データ変換
logger.info("Transforming data...")
transformer = DataTransformer()
df_transformed = transformer.transform(df)
# 異常値の確認
anomaly_count = df_transformed['is_anomaly'].sum()
logger.info(f"Detected {anomaly_count} anomalies")
# BigQueryへのロード
logger.info("Loading to BigQuery...")
loader = BigQueryLoader(BIGQUERY_CONFIG)
loader.load(df_transformed)
logger.info("ETL process completed successfully")
except Exception as e:
logger.error(f"Error occurred: {str(e)}")
raise
if __name__ == "__main__":
main()
5.2 実行結果の確認
BigQueryで変換後のデータを確認
文字化けチェック用SQLクエリ
日本語カラムの確認
レコード件数の集計
異常値確認用SQLクエリ
検出された異常値のリスト
金額順での並び替え
-- 文字化けチェック
SELECT
DebitAccountName,
CreditAccountName,
DepartmentName,
Description,
COUNT(*) as record_count
FROM `your-project.accounting.journal_entries`
GROUP BY 1,2,3,4
LIMIT 5;
-- 異常値の確認
SELECT
DebitAccountCode,
DebitAccountName,
Amount,
Description
FROM `your-project.accounting.journal_entries`
WHERE is_anomaly = TRUE
ORDER BY Amount DESC;
まとめ
実装のポイント
CP932文字コードへの対応
text = unicodedata.normalize('NFKC', text)
文字列の正規化(半角/全角、カタカナ)
text = mojimoji.zen_to_han(text, ascii=True, digit=True)
text = mojimoji.han_to_zen(text, kana=True)
異常値検出の自動化
エラーハンドリングとログ管理
次のステップ
Week 2では以下を実装予定です。
RAG基盤の構築
LangChainの環境設定
ベクトル検索の実装