見出し画像

Databento と RisingWave を活用したリアルタイム市場分析パイプラインの構築

リアルタイム市場分析パイプラインを構築する方法を知りたいですか?このチュートリアルでは、Databentoの市場データストリームとRisingWaveのストリーム処理機能を組み合わせて、ライブ市場分析システムを作成する方法を紹介します。

なぜこれが重要なのか

リアルタイム市場分析は、今日の金融業界で成功するために欠かせません。しかし、市場データの量、種類、速度を処理できるシステムを構築するのは簡単ではありません。このチュートリアルでは、Databento と RisingWave を使用して強力なリアルタイム市場分析パイプラインを作成する方法を説明します。Databento は、統一された API を通じてさまざまな取引所から正規化されたデータを提供し、RisingWave はリアルタイム処理の重い作業を担当し、VWAP のようなインサイトを最小の遅延で導き出すことができます。これらを組み合わせることで、より迅速で情報に基づいた意思決定を行い、競争優位性を得ることができます。

使用するツール

Databento: あなたの市場データソース

Databento は、現代的な API を通じてクリーンで正規化された市場データを提供します。これを市場への窓口と考えてください。どの取引所のデータでも、一貫した形式で過去のデータとリアルタイムデータを取得できます。

Databento は、ライブおよび過去の市場データに対して包括的なカバレッジを提供する強力で軽量なソリューションです。次の特徴があります:

  • 使いやすい:小さな API インターフェースと直感的なプロトコルにより、ユーザーは数分で利用を開始できます。

  • 高速:6.1 マイクロ秒の正規化遅延、FPGA ベースのキャプチャによるほぼゼロのデータギャップ。

  • 多用途:複数の資産クラスと取引所を統一されたメッセージ形式でサポートします。すべてのオーダーブックメッセージや数十万のシンボルを一度に処理できます。

RisingWave: あなたのリアルタイム処理と管理プラットフォーム

RisingWave は、SQL を使ってストリーミングデータをリアルタイムで分析するのを助けます。もし以前にデータベースを扱ったことがあれば、すぐに慣れるでしょう。Python SDK を使えば、さらに簡単に RisingWave を Python アプリケーションに統合できます。

手順

最初のステップ

始める前に、次の準備が必要です:

  1. Databento のアカウントとリアルタイムデータのライセンス。ライセンスの取得は比較的簡単で、非プロフェッショナルユーザーには即座に発行されることもあります。詳細については、Databento のライセンスガイドを参照してください。

  2. 実行中の RisingWave インスタンス。最も簡単な方法は、RisingWave Cloud アカウントを作成することです。数分で設定できます。また、オープンソース版をインストールしてローカルで実行することもできます。詳細な手順については、クイックスタートガイドをご覧ください。

  3. Python 3.7 以上。

次に、必要な Python パッケージを Python 環境にインストールします。

pip3 install databento
pip3 install risingwave-py

Databento から市場データを取得する

まず、E-mini S&P 500 先物データを取得します。これは最も流動性の高い先物契約の1つで、学習に最適です。ライブデータを取得する方法は以下の通りです:

import databento as db
   
client = db.Live()
client.subscribe(
    dataset="GLBX.MDP3",
    schema="trades",
    stype_in="parent",
    symbols="ES.FUT",
)

# テスト用に受信データを表示
for record in client:
    print(record)

RisingWave の設定

次に、RisingWave に接続します。以下のパラメータは、ローカルインスタンスへの接続に必要です。Cloud ユーザーの場合、クラスタの接続情報はクラスタカードの Connect をクリックして取得できます。

from risingwave import RisingWave, RisingWaveConnOptions

rw = RisingWave(
    RisingWaveConnOptions.from_connection_info(
        host="localhost", port=4566, user="root", password="root", database="dev"
    )
)

次に、Databento からのデータを保存するために、RisingWave に適切なスキーマを持つテーブルを作成します。

   with rw.getconn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS es_futures_live (
                timestamp TIMESTAMPTZ,
                symbol VARCHAR,
                price NUMERIC,
                size BIGINT
            );
        """)

次に、各指標に対してリアルタイム結果を得るためのマテリアライズドビューを定義します。RisingWave のマテリアライズドビューは、新しいデータレコードが到着するたびにデータを継続的に処理します。

  with rw.getconn() as conn:
  conn.execute("""
            CREATE MATERIALIZED VIEW vwap_analysis_live AS
            SELECT 
                window_start,
                SUM(price * size) / SUM(size) as vwap,
                AVG(price) as simple_avg_price,
                SUM(size) as total_volume,
                COUNT(*) as trade_count
            FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
            GROUP BY window_start;
        """)

このマテリアライズドビューは、VWAP 計算を5秒ごとに行い、取引実行の質を測るために重要です。機関投資家は、良い価格で取引をしていることを示すために、VWAP に一致させたり、上回ることを目指します。

データレコードの処理

データレコードが到着するたびに、データをフォーマットして RisingWave に挿入する関数を定義できます。

async def handle_trade(record):
    with rw.getconn() as conn:
        timestamp = datetime.fromtimestamp(record.ts_event / 1e9)
        params = {
            "timestamp": timestamp,
            "symbol": record.symbol,
            "price": float(record.price),
            "size": int(record.size)
        }
        conn.execute("""
            INSERT INTO es_futures_live 
            (timestamp, symbol, price, size)
            VALUES (:timestamp, :symbol, :price, :size)
        """, params)

データ変更の購読

マテリアライズドビューには時間ウィンドウで集計された値が含まれるため、アプリケーションをイベント駆動型にするために、その変更を購読したい場合があります。そのためには、まず変更イベントハンドラーを定義し、そのハンドラーを使用してマテリアライズドビューの変更を購読します。

# MV 変更イベントハンドラー
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
    # 更新操作のみを含む
    event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
    if event_df.empty:
        return

    # 出力用にデータフレームを整形
    event_df = event_df.rename(
        {
            "window_start": "Timestamp",
            "symbol": "Symbol",
            "vwap": "VWAP",
            "simple_avg_price": "Avg Price",
            "total_volume": "Volume",
            "trade_count": "Trades",
        },
        axis=1,
    )
    event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
    event_df = event_df.set_index(["Timestamp", "Symbol"])

    print()
    print("VWAP 分析の更新:")
    print(event_df)      

            
# MV 変更の購読
threading.Thread(
    target=lambda: rw.on_change(
        subscribe_from="vwap_analysis_live",
        handler=handle_vwap_changes,
        output_format=OutputFormat.DATAFRAME,
        persist_progress=False,
        max_batch_size=10
    )
).start()

すべてをまとめる

最後に、すべてをまとめて、実際にどのような結果が得られるかを見てみましょう。

from risingwave import RisingWave, RisingWaveConnOptions, OutputFormat
import databento as db
import pandas as pd
import threading

# データベース設定関数
def setup_database(rw: RisingWave) -> None:
    with rw.getconn() as conn:
        conn.execute(
            """
            CREATE TABLE IF NOT EXISTS es_futures_live (
                timestamp TIMESTAMP,
                symbol VARCHAR,
                price DOUBLE PRECISION,
                size BIGINT
            )"""
        )

        conn.execute(
            """
            CREATE MATERIALIZED VIEW IF NOT EXISTS vwap_analysis_live AS
            SELECT 
                window_start,
                symbol,
                SUM(price * size) / SUM(size) as vwap,
                AVG(price) as simple_avg_price,
                SUM(size) as total_volume,
                COUNT(*) as trade_count
            FROM TUMBLE(es_futures_live, timestamp, INTERVAL '5 SECONDS')
            GROUP BY window_start, symbol;"""
        )

# MV 変更イベントハンドラー
def handle_vwap_changes(event_df: pd.DataFrame) -> None:
    # 更新操作のみを含む
    event_df = event_df[event_df["op"].isin(["Insert", "UpdateInsert"])]
    if event_df.empty:
        return

    # 出力用にデータフレームを整形
    event_df = event_df.rename(
        {
            "window_start": "Timestamp",
            "symbol": "Symbol",
            "vwap": "VWAP",
            "simple_avg_price": "Avg Price",
            "total_volume": "Volume",
            "trade_count": "Trades",
        },
        axis=1,
    )
    event_df = event_df.drop(["op", "rw_timestamp"], axis=1)
    event_df = event_df.set_index(["Timestamp", "Symbol"])

    print()
    print("VWAP 分析の更新:")
    print(event_df)

def main() -> None:
    # RisingWave 接続の初期化
    rw = RisingWave(
        RisingWaveConnOptions.from_connection_info(
            host="localhost", port=4566, user="root", password="root", database="dev"
        )
    )
    setup_database(rw)

    # MV 変更の購読
    threading.Thread(
        target=lambda: rw.on_change(
            subscribe_from="vwap_analysis_live",
            handler=handle_vwap_changes,
            output_format=OutputFormat.DATAFRAME,
            persist_progress=False,
            max_batch_size=10,
        )
    ).start()

    # Databento を通じて CME データを購読
    db.enable_logging()
    client = db.Live()
    client.subscribe(
        dataset="GLBX.MDP3",
        schema="trades",
        stype_in="parent",
        symbols="ES.FUT",
    )

    # RisingWave に取引データを送信
    with rw.getconn() as conn:
        for record in client:
            # Trade レコードのみを処理
            if not isinstance(record, db.TradeMsg):
                continue

            # 人間が読めるシンボル名を取得
            symbol = client.symbology_map.get(record.instrument_id)
            if symbol is None:
                continue

            params = {
                "timestamp": record.pretty_ts_recv,
                "symbol": symbol,
                "price": record.pretty_price,
                "size": record.size,
            }
            conn.execute(
                """
                INSERT INTO es_futures_live 
                (timestamp, symbol, price, size)
                VALUES (:timestamp, :symbol, :price, :size)""",
                params,
            )

if __name__ == "__main__":
    main()

以下は、リアルタイムで新しい市場データが到着するたびに更新されるライブ VWAP 分析の動的出力の一例です。

| Timestamp           | Symbol        | VWAP     | Avg Price | Volume | Trades |
|---------------------|---------------|----------|-----------|--------|--------|
| 2024-12-13 20:03:30 | ESZ4-ESH5     | 69.3259  | 69.3250   | 297    | 44     |
| 2024-12-13 20:03:35 | ESZ4          | 6057.875 | 6057.875  | 22     | 22     |
| 2024-12-13 20:03:35 | ESZ4-ESH5     | 69.3313  | 69.3200   | 1265   | 55     |
| 2024-12-13 20:03:40 | ESH5          | 6127.587 | 6127.588  | 1199   | 407    |
| 2024-12-13 20:03:40 | ESZ4          | 6058.076 | 6058.034  | 3839   | 1045   |

次に進むべきこと

リアルタイム市場データがシステムを通じて流れるようになったので、次のようなことができます:

  • 注文フローの不均衡など、さらに高度な分析を追加する

  • VWAP クロスオーバーに基づいた取引シグナルを作成する

  • 指標を視覚化するダッシュボードを作成する



いいなと思ったら応援しよう!

この記事が参加している募集