【コード公開】公開されていた CB Bot のコードを別言語でチューニングしてみた【勝率99.9%サーキットブレーカー】

ネットを巡回していたところ CB Bot というものが存在することを知り、公開されていたコードをリファクタリングしてみたというお話と、実際のコードも無料で公開します。

よろしければツイッターのフォローをしていただけると助かります🙇‍♂
@rarirureluis


どれぐらい早くなったか

実際にベンチマークを取り、どの程度速く注文が完了するかを見てみます。

ここでいうベンチマークはループ内の
1. 現在のUNIXタイムスタンプを取得
2. 10分前のUNIXタイムスタンプを計算
3. 10分前の約定履歴を取得
4. 指値注文を発注
5. 全ての注文をキャンセル

の一連の流れを30回ループしてかかった時間の合計を30で割り、1ループ辺りにかかる時間を算出します。(time.sleep の部分は削除しています)

紹介されていたコード

30回ループでかかった合計時間は 153s となりました。
1ループにつき 5.1s という結果になりました。

ベンチマークを測るように追加したコード:

total_duration = 0  # ループ内での処理にかかった合計時間

for _ in range(30):
    start_time = time.time()  # 処理開始時間

    ...メインロジック(注文、キャンセル)

    end_time = time.time()  # 処理終了時間
    duration = end_time - start_time  # 処理にかかった時間
    total_duration += duration  # 合計時間に追加

print(f"Total execution time: {total_duration} seconds")
sys.exit(0)

リファクタリングしたコード

合計 8.9s、1ループにつき 0.3s という結果になりました。
オリジナルコードに比べて 17倍の高速なようです。

ベンチマークを測るように追加したコード:

var totalDuration time.Duration
count := 1

// 10 min TTL で削除されたレコードから10分前の取引価格を取得し、これをベースに注文を行う
cache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[int64, float64]) {
	if reason == ttlcache.EvictionReasonExpired {
		startTime := time.Now()

		if count == 30 {
			log.Printf("Total execution time: %v\n", totalDuration) // 合計時間を出力
			os.Exit(0)
		}

	...メインロジック(注文、キャンセル)

		endTime := time.Now()              // 処理終了時間
		duration := endTime.Sub(startTime) // 処理にかかった時間
		totalDuration += duration          // 合計時間に追加
		count += 1
		}
	})

なぜ高速化ができたか

最初に断っておくと言語の違いについては言及しません。
私は、プログラミング言語そのもの自体に詳しくないため、誤解を与えてしまうかもしれないからです。

今回の bot において重い処理とは何か

それは HTTP 通信です。

HTTP 通信の中でも初回接続時の「スリーウェイハンドシェイク」はそれなりに重いです。
クライアント内部では通信を行うために OS で利用可能なソケットを確保する必要があるためです。

オリジナルコードではループ内で CCXT を使って毎回 API 経由で 10分前の約定履歴を取得しています。
つまり毎回スリーウェイハンドシェイクをしているため、ここをどうにかできれば良さそうです。

それに対しリファクタリングしたコードは WebSocket を使い、約定履歴をメモリにキャッシュとして保存し、そのキャッシュの有効期限を10分 にしています。

キャッシュが削除されるタイミングでフックした関数が呼ばれ有効期限切れとなったキャッシュを引数に、注文、キャンセルを行っています。
つまり、引数には10分前の約定履歴が入ってくるので API を通さずに約定履歴を参照することができます。

しかし bitFlyer では Private WebSocket による注文ができなさそうなので注文、キャンセルはどうしても API 経由になります。
このとき毎回「スリーウェイハンドシェイク」をしていてはリソースの無駄があるためこれを解決する必要があります。

Keep-Alive

Keep-Alive の説明はググるとして Keep-Alive を利用することでコネクション(ソケット)を使いまわし、スリーウェイハンドシェイクをスキップすることで高速化することができます。

Go でリファクタリングを行った際に bitFlyer のライブラリが有志によって公開されていたのでこれを利用しましたが、一部問題もありました。

このライブラリはリクエストを行うごとに HTTP クライアントを作成していたため、フォークし HTTP クライアントを使い回せるようにしました。

これにより下記のように Keep-Alive を有効にした HTTP クライアントを引数に渡せるようになりました。

client := bitflyer.New(
	config.BFAPIKey,
	config.BFAPISecret,
	&http.Client{
		Transport: &http.Transport{
			DialContext: (&net.Dialer{
				Timeout:   3 * time.Second,
				KeepAlive: 3 * time.Second,
			}).DialContext,
			MaxIdleConns:        32,
			MaxIdleConnsPerHost: 16,
			IdleConnTimeout:     10 * time.Minute,
		},
	},
)

Keep-Alive の動作も確認ができます。

Keep-Alive の確認

コードの公開

バイナリファイルをダウンロードし、フラグとして

  • --bf-api-key

  • --bf-api-secret

  • --size

を指定してください。
例:./bf-cb-bot --bf-api-key "hoge" --bf-api-secret "fuga" --size 0.01

下記 zip ファイルに各環境ごとのバイナリファイルがあります。
パスワード:rarirureluis

セキュリティに心配する正しい方は下記のコードをビルドしてお使いください。
これに限らず簡単なbotなどで他人の作ったライブラリはフォークして利用することがオススメです。ライブラリ制作者が万が一裏切ってしまう可能性は無きにしもあらずです。

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"strconv"
	"sync"
	"syscall"
	"time"

	"github.com/jellydator/ttlcache/v3"
	"github.com/rluisr/go-bitflyer"
	"github.com/rluisr/go-bitflyer/private"
	"github.com/rluisr/go-bitflyer/realtime"
	"github.com/rluisr/go-bitflyer/types"
)

const (
	HistoryCacheTtl = 10 * time.Minute
)

var (
	mu         sync.Mutex
	inProgress bool
)

func main() {
	bfAPIKey := flag.String("bf-api-key", "", "API key for BF")
	bfAPISecret := flag.String("bf-api-secret", "", "API secret for BF")
	sizeStr := flag.String("size", "", "Size parameter (float64)")

	flag.Parse()

	if *bfAPIKey == "" || *bfAPISecret == "" || *sizeStr == "" {
		log.Fatal("All flags --bf-api-key, --bf-api-secret and --size are required.")
	}

	size, err := strconv.ParseFloat(*sizeStr, 64)
	if err != nil {
		log.Fatalf("Error parsing --size: %v", err)
	}

	sig := make(chan os.Signal)
	signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

	client := bitflyer.New(
		*bfAPIKey,
		*bfAPISecret,
		&http.Client{
			Transport: &http.Transport{
				DialContext: (&net.Dialer{
					Timeout:   3 * time.Second,
					KeepAlive: 3 * time.Second,
				}).DialContext,
				MaxIdleConns:        32,
				MaxIdleConnsPerHost: 16,
				IdleConnTimeout:     10 * time.Minute,
			},
		},
	)

	ctx, cancel := context.WithCancel(context.Background())
	ws, err := realtime.New(ctx)
	if err != nil {
		panic(err)
	}

	defer ws.Close()

	recv := make(chan realtime.Response, 3)
	go ws.Connect(
		client.Auth,
		[]string{
			"executions",
		},
		[]string{
			types.FXBTCJPY,
		},
		recv)

	cache := ttlcache.New[int64, float64](
		ttlcache.WithTTL[int64, float64](HistoryCacheTtl),
	)

	// 10 min TTL で削除されたレコードから10分前の取引価格を取得し、これをベースに注文を行う
	cache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[int64, float64]) {
		if reason == ttlcache.EvictionReasonExpired {
			mu.Lock()
			if inProgress {
				mu.Unlock()
				return
			}
			inProgress = true
			mu.Unlock()

			buyPrice := float64(int(item.Value() * 0.8))
			sellPrice := float64(int(item.Value() * 1.2))

			_, limit, buyErr := client.ChildOrder(&private.ChildOrder{
				ProductCode:    types.FXBTCJPY,
				ChildOrderType: types.LIMIT,
				Side:           types.BUY,
				Price:          buyPrice, // API の仕様 整数に変換する必要がある
				Size:           size,
				MinuteToExpire: 1, // 1分後にキャンセル
			})
			if buyErr != nil {
				mu.Lock()
				inProgress = false
				mu.Unlock()

				log.Printf("faild to create buy order: %v limit: %v", buyErr, limit)
				return
			}
			log.Printf("buy order was created: price: %0.f, volume: %f\n", buyPrice, size)

			_, limit, sellErr := client.ChildOrder(&private.ChildOrder{
				ProductCode:    types.FXBTCJPY,
				ChildOrderType: types.LIMIT,
				Side:           types.SELL,
				Price:          sellPrice, // API の仕様 整数に変換する必要がある
				Size:           size,
				MinuteToExpire: 1, // 1分後にキャンセル
			})
			if sellErr != nil {
				mu.Lock()
				inProgress = false
				mu.Unlock()

				log.Printf("faild to create sell order: %v limit: %v", sellErr, limit)
				return
			}
			log.Printf("sell order was created: price: %0.f, volume: %f\n", sellPrice, size)

			time.Sleep(10 * time.Second)

			_, limit, err := client.Cancel(&private.Cancel{
				ProductCode: types.FXBTCJPY,
			})
			if err != nil {
				log.Println(err, fmt.Sprintf("%+v\n", limit))
				return
			}

			mu.Lock()
			inProgress = false
			mu.Unlock()
		}
	})

	go Receiver(ctx, recv, cache)

	<-sig
	cancel()
	close(recv)

}

func Receiver(ctx context.Context, ch chan realtime.Response, cache *ttlcache.Cache[int64, float64]) {
	defer fmt.Println("func receiver was done")
Loop:
	for {
		select {
		case <-ctx.Done():
			break Loop
		case v := <-ch:
			switch v.Types {
			case realtime.ExecutionsN:
				// 明示的に呼ばないと OnEviction が発火しない
				go cache.DeleteExpired()
				cache.Set(time.Now().UnixNano(), v.Executions[0].Price, ttlcache.DefaultTTL)
			}
		}
	}
}

このプログラムにより生じた問題に私は一切の責任を負いません。

よろしければツイッターのフォローをしていただけると助かります🙇‍♂
@rarirureluis

何か不具合などあったら上記ツイッターよりご連絡ください。

いいなと思ったら応援しよう!