pe氏pybottersinagoflyer読み解き殴り書きメモ3

続き

AbstractTimeBarにもどる

def __init__(
        self,
        store: "pybotters.store.DataStore",
        unit_seconds: int,
        maxlen: int = 9999,
        callbacks: list[Callable[[AbstractTimeBar], None]] = (),
    ):
        self._store = store
        self._seconds = unit_seconds
        self._rule = f"{unit_seconds}S"
        self._bar = StreamArray((maxlen, 7))
        self._cur_bar = np.zeros(7)
        self._timestamp = deque(maxlen=maxlen)

        # callback
        self._callbacks = callbacks or []
        self._d = {}

        # 確定足取得用
        self._queue = asyncio.Queue(1)

        self._task = asyncio.create_task(self.auto_update())

self._barまで確認した
np.zerosは0が入った配列を作成
dequeはどちらからもappendとpopができる。つまり頭から値を入れたり後ろから入れたりできる。maxlenはオブジェクトの最大長を指定できる。

確定足取得用のプログラムは以下のサイトを参考にした

複数の async メソッド同士でやり取りするのに非常に便利なクラス。
非同期な処理の完了を一方が無限ループで queue を get し続け、もう一方が完了し次第 put していく作りはよくやります。
queueで保持可能なアイテム数を指定する場合(x)

とりあえず非同期の際のデータを保持したりなんなりするらしい

async メソッドの処理を別スレッド立てるみたいに実行します。完了をawaitするのでなく、並行して実行します。

self._auto_update()をタスク化してawaitが発生するたびに実行するようにしてる

    async def init(self, executions: list[dict]):
        [self.update(e) for e in executions]
def update(self, e: pybotters.store.StoreChange):
        """要オーバーライド"""
        raise NotImplementedError

executionというリストをもらって、ひとつづつオーバーライドしてる

    def new_bar(
        self,
        price: Union[int, float],
        volume: float,
        side: str,
        timestamp: pd.Timestamp = None,
    ):
        """最新足の確定"""
        check_side(side)

        if side == "BUY":
            buy_volume, sell_volume = volume, 0
        else:
            buy_volume, sell_volume = 0, volume

        self._bar.append(self._cur_bar)
        self._cur_bar = np.array(
            [price, price, price, price, volume, buy_volume, sell_volume]
        )
        if timestamp is None:
            timestamp = pd.Timestamp.utcnow().floor(self._rule)
        self._timestamp.append(timestamp)

        # 確定した最新barを格納
        try:
            self._queue.put_nowait(self._bar)
        except asyncio.QueueFull:
            self._queue.get_nowait()
            self._queue.put_nowait(self._bar)
def check_side(side):
    assert side in ["BUY", "SELL"]

まずassertでsideに問題がないか確認
sideの上限分岐で、引数のvolumeをbuy_volumeとsell_volumeに分けてる
_barに_cur_barをqueueして、新しい引数をもとに_cur_barを再作成
timestampがNoneの場合、utcnow()でUTCを取得し、floorで解像度を変換し追加してる
_queueにput_nowaitで追加。put_nowaitはputと同じ動きをする
asyncio.QueueFullはいっぱいの場合の処理。一度空にしてからいれる

def update_cur_bar(self, price: Union[int, float], volume: float, side: str):
        """未確定足の更新"""
        # high
        self._cur_bar[1] = max(price, self._cur_bar[1])
        # low
        self._cur_bar[2] = min(price, self._cur_bar[2])
        # close
        self._cur_bar[3] = price
        # volume
        self._cur_bar[4] += volume
        if side == "BUY":
            self._cur_bar[5] += volume
        elif side == "SELL":
            self._cur_bar[6] += volume
        else:
            raise RuntimeError(f"Unsupported: {side}")

未確定のデータが入ってくるたびに_cur_barにデータを突っ込んでいく
high,lowは前に入っているものと比較をしながら、クローズはそのまま、volumeは足し合わせる

async def auto_update(self):
        """約定情報の受信タスク"""
        async for e in self.execution_stream():
            self.update(e)

            # 約定情報を受信するたびにcallback
            for cb in self._callbacks:
                self.d.update(cb(self))

非同期にて約定情報を受信してる。
受信するたびにupdateをオーバーライドして、コールバックする

async def execution_stream(self):
        """pybottersのwatchを使って約定情報を随時generateする。"""
        with self._store.watch() as stream:
            async for msg in stream:
                if msg.operation == "insert":
                    yield msg.data

最近?pybottersに追加されたwatch機能。waitではうまくできなかった動きを改善したらしい(すみませんよくわかってません)
でもとりあえずは、storeを監視して約定情報が入ってきてるかどうか、はいってきたら処理をしてデータを返すっぽい

async def get_bar_at_settled(self) -> np.ndarray:
        """``await bar.get_bar_at_settled()``で最新足確定時に取得できる。"""
        return await self._queue.get()

ここは説明のままっぽいので省略

    @property
    def store(self):
        return self._store

    @property
    def bar(self):
        return self._bar

    @property
    def timestamp(self):
        return self._timestamp

    @property
    def cur_bar(self):
        return self._cur_bar

    @property
    def open(self):
        return self._bar[:, 0]

    @property
    def o(self):
        return self._bar[-1, 0]

    @property
    def high(self):
        return self._bar[:, 1]

    @property
    def h(self):
        return self._bar[-1, 1]

    @property
    def low(self):
        return self._bar[:, 2]

    @property
    def l(self):
        return self._bar[-1, 2]

    @property
    def close(self):
        return self._bar[:, 3]

    @property
    def c(self):
        return self._bar[-1, 3]

    @property
    def volume(self):
        return self._bar[:, 4]

    @property
    def v(self):
        return self._bar[-1, 4]

    @property
    def buy_volume(self):
        return self._bar[:, 5]

    @property
    def bv(self):
        return self._bar[-1, 5]

    @property
    def sell_volume(self):
        return self._bar[:, 6]

    @property
    def sv(self):
        return self._bar[-1, 6]

    @property
    def d(self):
        return self._d

@propertyについて調べる

値をしっかり管理したいけど、インスタンス変数のように自然に値にアクセスできるようにもしたい。
この両者のいいとこ取りをしたのが、「プロパティ」。

https://qiita.com/cardene/items/8a59d576d360b7568c3a

まあ、とりあえずは値を管理するものって覚えておけばいいのかな?
あとは関数の名前で_barをスライスして扱いやすくしてるっぽい
スライスは[:,0]みたなやつで、この場合だと全ての行の0列目を取得することになる
[-1, 0]は一番後ろの0列目なので、最新データってことなのかな??読み進めていったらわかることに期待

ここまでAbstractTimeBar
まとめとしては、storeをpybottersのwatchで監視して、約定情報があったらnumpyでいろいろいじって、ohlcvを配列で作成してるclassっぽい

main関数に戻る

def log_volume(bar: AbstractTimeBar):
            """最新足のbuy/sell volumeのログを計算するcallback"""
            d = dict()
            d["bv_log"] = np.log1p(bar.bv)
            d["sv_log"] = np.log1p(bar.sv)
            return d

ここの引数barとはなんぞやっていうのを今まで調べてた
np.log1pとはなにか調べる

底をeとするa+1の対数

https://www.sejuku.net/blog/70027

つまり、引数で与えられたbarの最新volumeを対数にしてdict型で保存する関数らしい。(多分)

次にbar_lを見ていく

bar_l = BitflyerTimeBar(
            unit_seconds=args.bar_unit_seconds_long,
            store=store.executions,
            maxlen=args.bar_maxlen,
            callbacks=[log_volume],
        )

引数はargsで指定したもの、storeの中のもの、さっきのlog_volumeって感じ
BitflyerTimeBarをみていく

class BitflyerTimeBar(AbstractTimeBar):
    """bitFlyer用のTimeBar実装"""

    def __init__(
        self,
        store: pybotters.models.bitflyer.Executions,
        unit_seconds: int,
        maxlen: int = 9999,
        callbacks: list[Callable[[BitflyerTimeBar], None]] = (),
    ):
        super(BitflyerTimeBar, self).__init__(store, unit_seconds, maxlen, callbacks)

    def update(self, e: dict):
        cur_ts = pd.to_datetime(e["exec_date"]).floor(self._rule)

        if len(self._timestamp) == 0:
            last_ts = None
        else:
            last_ts = self._timestamp[-1].floor(f"{self._rule}")

        if last_ts is None or last_ts != cur_ts:
            self.new_bar(e["price"], e["size"], e["side"], cur_ts)
        else:
            self.update_cur_bar(e["price"], e["size"], e["side"])

AbstractTimeBarの子クラス。初期処理は省略。
updateはオーバーライドされていて、bitflyerのwebsocketで返ってくる約定データを加工してbarを作成してるっぽい
bar_sに関しては多分ほどんど同じだから省略

ここまでは様々な方法を用いてnumpyで扱えるローソク足を作成しているとみた。
基本的にnumpyの配列とかあまり使ったことなかったから勉強になるなぁ
まだ慣れないけど、使えるようになったら計算も早く行えるしdfより便利そう

本業にもどるので今日はここまで。

いいなと思ったら応援しよう!