見出し画像

TROCCOのプランが変わったので、Cloud FunctionsでHubSpotからBigQueryへデータ転送してみたよ

こんにちは、すずきです。
どうにも盛り上がりに欠けるバチェロレッテシーズン3を最新話まで観終わったので、バーベキュー最強決戦!シーズン3を観始めました。第六話の「牛肩バラ肉勝負」は名勝負でしたね..


はじめに

本記事では、HubSpotで管理している「コンタクト(contacts)」「会社(companies)」「取引(deals)」データを、BIツール(Looker Studioなど)のデータソースとなるBigQueryに転送する方法を紹介します。

これまでTROCCOというSaaSでHubSpotからスプレッドシートにデータを転送し、スプレッドシートからLooker Studioでデータを読み込んでいたのですが、TROCCOの料金プラン変更により、以前のように2時間に1回の頻繁なデータ更新が無料範囲で行えなくなってしまいました。加えて、スプレッドシートからのデータ読み込みが遅いという課題もあったため、TROCCOとスプレッドシートの代替としてGoogle Cloud Functions(GCF)とBigQueryを利用する新しい方法を採用しました。

GCP構成

  • プロジェクト名: hubspot-to-bigquery

  • Google Cloud Functions(GCF): sync-hubspot-to-bigquery

    • スクリプトはPythonで実装

  • Cloud Scheduler: sync-hubspot-to-bigquery-job

    • 毎時0分にジョブを実行

  • BigQuery:

    • データセット: hubspot_data

    • テーブル: contacts, companies, deals

データの流れ

  1. Cloud Schedulerが1時間に1回GCFをトリガーする

  2. GCFはHubSpotからのデータを抽出し、BigQueryに転送する
    ※データ更新のため、全レコードを削除後にレコードを新たに追加する

  3. Looker StudioはBigQueryのテーブルを参照してレポートを生成する

実装

HubSpotアクセストークンの発行

HubSpotの設定メニューから非公開アプリを作成し、必要なスコープ(crm.object.deals.read, crm.object.companies.read, crm.object.contacts.read)を設定します。非公開アプリの作成後、アクセストークンをコピーして後ほど使用します。

設定 > 連携 >非公開アプリ
非公開アプリ

BigQueryテーブルの作成

BigQueryでhubspot_dataデータセットを作成し、deals, companies, contactsテーブルをクエリから作成します。以下は`companies`テーブルの作成例です。

CREATE TABLE `hubspot-to-bigquery.hubspot_data.companies` (
  id INT64,
  created_at TIMESTAMP,
  updated_at TIMESTAMP,
  about_us STRING,
  ...
)
データセットとテーブル

GCFの作成

以下がスクリプトの全文です。

import os
import logging
from datetime import datetime, timezone, timedelta
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException
from google.cloud import bigquery
from google.api_core.retry import Retry

logging.basicConfig(level=logging.INFO)

client = bigquery.Client()

contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table("hubspot-to-bigquery.hubspot_data.contacts")
contacts_table_schema_keys = {field.name for field in contacts_table.schema}

companies_table_id = "hubspot-to-bigquery.hubspot_data.companies"
companies_table = client.get_table("hubspot-to-bigquery.hubspot_data.companies")
companies_table_schema_keys = {field.name for field in companies_table.schema}

deals_table_id = "hubspot-to-bigquery.hubspot_data.deals"
deals_table = client.get_table("hubspot-to-bigquery.hubspot_data.deals")
deals_table_schema_keys = {field.name for field in deals_table.schema}

contacts_properties = [
    "id", "created_at", "updated_at", "company_size", "date_of_birth",...
]

companies_properties = [
    "id", "created_at", "updated_at", "about_us",...
]

deals_properties = [
    "id", "created_at", "updated_at", "amount_in_home_currency",...
]


def sync_hubspot_to_bigquery(_):
    access_token = os.getenv("ACCESS_TOKEN")
    if not access_token:
        logging.error("Access token not found in environment variables")
        return "Access token not found in environment variables", 500

    # HubSpot APIクライアントの初期化
    api_client = HubSpot(access_token=access_token)

    try:
        # 全件更新のため各テーブルから全レコードを削除
        delete_table_records(contacts_table_id)
        delete_table_records(companies_table_id)
        delete_table_records(deals_table_id)

        # HubSpotからデータを取得
        contacts_fetched = api_client.crm.contacts.get_all(properties=contacts_properties)
        companies_fetched = api_client.crm.companies.get_all(properties=companies_properties)
        deals_fetched = api_client.crm.deals.get_all(properties=deals_properties)

        # 挿入レコードの作成
        contacts_rows = create_rows_to_insert(contacts_fetched, contacts_table_schema_keys)
        companies_rows = create_rows_to_insert(companies_fetched, companies_table_schema_keys)
        deals_rows = create_rows_to_insert(deals_fetched, deals_table_schema_keys)
        
        # BigQueryの各テーブルにデータ挿入
        insert_rows_bigquery(contacts_table_id, contacts_rows)
        insert_rows_bigquery(companies_table_id, companies_rows)
        insert_rows_bigquery(deals_table_id, deals_rows)

        success_message = f"Data synchronized successfully: {len(contacts_rows)} contacts, {len(companies_rows)} companies, and {len(deals_rows)} deals updated."
        logging.info(success_message)
        return success_message, 200

    except ApiException as e:
        error_message = f"Exception when requesting: {e}"
        logging.error(error_message)
        return error_message, 500

def delete_table_records(table_id):
    delete_query = f"""
    DELETE FROM `{table_id}` WHERE TRUE
    """

    try:
        query_job = client.query(delete_query)
        query_job.result()
        logging.info(f"All records have been deleted from {table_id}.")
    except Exception as e:
        logging.error(f"Failed to delete records from {table_id}: {e}")

def convert_utc_to_jst(timestamp):
    # 日本標準時(JST)に変換
    jst_zone = timezone(timedelta(hours=9))
    jst_time = timestamp.astimezone(jst_zone)
    logging.debug(f"Converted {timestamp} to {jst_time}")
    return jst_time.isoformat()

def create_rows_to_insert(fetched_data, table_schema_keys):
    rows_to_insert = []
    for data in fetched_data:
        data_properties = data.properties
        
        # BigQueryに挿入するための行データを作成
        row = {
            "id": data.id,
            "created_at": convert_utc_to_jst(data.created_at) if data.created_at else None,
            "updated_at": convert_utc_to_jst(data.updated_at) if data.updated_at else None
        }

        for key, prop in data_properties.items():
            if key in table_schema_keys:  # スキーマに存在するカラムのみ処理
                value = prop if prop != '' and prop is not None else None

                # datetimeオブジェクトだった場合、JSTに変換してISOフォーマットにする
                if isinstance(value, datetime):
                    value = convert_utc_to_jst(value)
                
                row[key] = value

        rows_to_insert.append(row)
    return rows_to_insert

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
    # リトライポリシーの設定
    custom_retry = Retry(
        initial=1.0,  # 初期遅延時間 1 秒
        maximum=10.0,  # 最大遅延時間 10 秒
        multiplier=2.0,  # 遅延時間の倍増係数
        deadline=1200.0  # 全体の最大試行時間 1200 秒
    )
    
    # 分割してデータを挿入
    for i in range(0, len(rows_to_insert), batch_size):
        batch = rows_to_insert[i:i + batch_size]

        try:
            errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
            if errors:
                logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")
            else:
                logging.info(f"Batch {i // batch_size + 1} inserted successfully into {table_id}.")
        except Exception as e:
            logging.error(f"Error inserting data into {table_id}: {e}")
functions-framework==3.*
hubspot-api-client
google-cloud-bigquery

ログ設定とBigQueryクライアントの初期化

ロギングは情報レベルで設定され、BigQueryクライアントはデフォルトのプロジェクト設定で初期化されます。また、使用するBigQueryテーブルのIDとスキーマのキーを取得し、後のデータ挿入で使用します。

logging.basicConfig(level=logging.INFO)
client = bigquery.Client()

contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}
# 同様に会社と取引データテーブルの設定も行います。

データ同期関数の定義

sync_hubspot_to_bigquery関数では、環境変数からHubSpotのアクセストークンを取得し、APIクライアントを初期化します。その後、指定されたプロパティでHubSpotのデータを抽出し、BigQueryに挿入する一連のステップを実行します。

def sync_hubspot_to_bigquery(_):
    access_token = os.getenv("ACCESS_TOKEN")
    if not access_token:
        logging.error("Access token not found in environment variables")
        return "Access token not found in environment variables", 500

    api_client = HubSpot(access_token=access_token)
    # データ抽出と挿入の詳細な手順は後述します。

データ挿入とリトライポリシー

抽出したデータはBigQueryにバッチで挿入されます。リトライポリシーは失敗した挿入操作を自動的に再試行するために設定され、データ整合性を保ちます。

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
    custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
    for i in range(0, len(rows_to_insert), batch_size):
        batch = rows_to_insert[i:i + batch_size]
        errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
        if errors:
            logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")

BigQueryのデータ更新と時刻変換の注意点

  • ストリーミングバッファの制限: BigQueryのストリーミングバッファ仕様により、データ更新頻度を1時間より短く設定するとエラーになってしまいました。TROCCO使用時の更新頻度は2時間に1回だったので、むしろ今回は頻度を短くできたのですが、もっと短い頻度が必要である場合は注意が必要です。

  • 時刻データの変換必要性: BigQueryではTIMESTAMP型のデータがUTCで保管されるため、日本時間に変換する処理が必要です。Looker Studioでデータを分析する際には、この時刻変換を適切に行うことで、時差の影響を受けずに正確なレポート作成が可能です。

Cloud Schedulerの作成

Cloud Schedulerを使用して、GCFを毎時0分に自動的に実行するジョブを設定します。このスケジュールにより、HubSpotのデータが定期的に更新され、常に最新の情報がBigQueryに保持されます。スケジューラのCron設定は0 * * * *で、これで毎時0分にジョブがトリガーされます。

Cloud Scheduler

参考

採用情報


いいなと思ったら応援しよう!