pe氏pybottersinagoflyer読み解き殴り書きメモ3
続き
AbstractTimeBarにもどる
def __init__(
self,
store: "pybotters.store.DataStore",
unit_seconds: int,
maxlen: int = 9999,
callbacks: list[Callable[[AbstractTimeBar], None]] = (),
):
self._store = store
self._seconds = unit_seconds
self._rule = f"{unit_seconds}S"
self._bar = StreamArray((maxlen, 7))
self._cur_bar = np.zeros(7)
self._timestamp = deque(maxlen=maxlen)
# callback
self._callbacks = callbacks or []
self._d = {}
# 確定足取得用
self._queue = asyncio.Queue(1)
self._task = asyncio.create_task(self.auto_update())
self._barまで確認した
np.zerosは0が入った配列を作成
dequeはどちらからもappendとpopができる。つまり頭から値を入れたり後ろから入れたりできる。maxlenはオブジェクトの最大長を指定できる。
確定足取得用のプログラムは以下のサイトを参考にした
とりあえず非同期の際のデータを保持したりなんなりするらしい
self._auto_update()をタスク化してawaitが発生するたびに実行するようにしてる
async def init(self, executions: list[dict]):
[self.update(e) for e in executions]
def update(self, e: pybotters.store.StoreChange):
"""要オーバーライド"""
raise NotImplementedError
executionというリストをもらって、ひとつづつオーバーライドしてる
def new_bar(
self,
price: Union[int, float],
volume: float,
side: str,
timestamp: pd.Timestamp = None,
):
"""最新足の確定"""
check_side(side)
if side == "BUY":
buy_volume, sell_volume = volume, 0
else:
buy_volume, sell_volume = 0, volume
self._bar.append(self._cur_bar)
self._cur_bar = np.array(
[price, price, price, price, volume, buy_volume, sell_volume]
)
if timestamp is None:
timestamp = pd.Timestamp.utcnow().floor(self._rule)
self._timestamp.append(timestamp)
# 確定した最新barを格納
try:
self._queue.put_nowait(self._bar)
except asyncio.QueueFull:
self._queue.get_nowait()
self._queue.put_nowait(self._bar)
def check_side(side):
assert side in ["BUY", "SELL"]
まずassertでsideに問題がないか確認
sideの上限分岐で、引数のvolumeをbuy_volumeとsell_volumeに分けてる
_barに_cur_barをqueueして、新しい引数をもとに_cur_barを再作成
timestampがNoneの場合、utcnow()でUTCを取得し、floorで解像度を変換し追加してる
_queueにput_nowaitで追加。put_nowaitはputと同じ動きをする
asyncio.QueueFullはいっぱいの場合の処理。一度空にしてからいれる
def update_cur_bar(self, price: Union[int, float], volume: float, side: str):
"""未確定足の更新"""
# high
self._cur_bar[1] = max(price, self._cur_bar[1])
# low
self._cur_bar[2] = min(price, self._cur_bar[2])
# close
self._cur_bar[3] = price
# volume
self._cur_bar[4] += volume
if side == "BUY":
self._cur_bar[5] += volume
elif side == "SELL":
self._cur_bar[6] += volume
else:
raise RuntimeError(f"Unsupported: {side}")
未確定のデータが入ってくるたびに_cur_barにデータを突っ込んでいく
high,lowは前に入っているものと比較をしながら、クローズはそのまま、volumeは足し合わせる
async def auto_update(self):
"""約定情報の受信タスク"""
async for e in self.execution_stream():
self.update(e)
# 約定情報を受信するたびにcallback
for cb in self._callbacks:
self.d.update(cb(self))
非同期にて約定情報を受信してる。
受信するたびにupdateをオーバーライドして、コールバックする
async def execution_stream(self):
"""pybottersのwatchを使って約定情報を随時generateする。"""
with self._store.watch() as stream:
async for msg in stream:
if msg.operation == "insert":
yield msg.data
最近?pybottersに追加されたwatch機能。waitではうまくできなかった動きを改善したらしい(すみませんよくわかってません)
でもとりあえずは、storeを監視して約定情報が入ってきてるかどうか、はいってきたら処理をしてデータを返すっぽい
async def get_bar_at_settled(self) -> np.ndarray:
"""``await bar.get_bar_at_settled()``で最新足確定時に取得できる。"""
return await self._queue.get()
ここは説明のままっぽいので省略
@property
def store(self):
return self._store
@property
def bar(self):
return self._bar
@property
def timestamp(self):
return self._timestamp
@property
def cur_bar(self):
return self._cur_bar
@property
def open(self):
return self._bar[:, 0]
@property
def o(self):
return self._bar[-1, 0]
@property
def high(self):
return self._bar[:, 1]
@property
def h(self):
return self._bar[-1, 1]
@property
def low(self):
return self._bar[:, 2]
@property
def l(self):
return self._bar[-1, 2]
@property
def close(self):
return self._bar[:, 3]
@property
def c(self):
return self._bar[-1, 3]
@property
def volume(self):
return self._bar[:, 4]
@property
def v(self):
return self._bar[-1, 4]
@property
def buy_volume(self):
return self._bar[:, 5]
@property
def bv(self):
return self._bar[-1, 5]
@property
def sell_volume(self):
return self._bar[:, 6]
@property
def sv(self):
return self._bar[-1, 6]
@property
def d(self):
return self._d
@propertyについて調べる
まあ、とりあえずは値を管理するものって覚えておけばいいのかな?
あとは関数の名前で_barをスライスして扱いやすくしてるっぽい
スライスは[:,0]みたなやつで、この場合だと全ての行の0列目を取得することになる
[-1, 0]は一番後ろの0列目なので、最新データってことなのかな??読み進めていったらわかることに期待
ここまでAbstractTimeBar
まとめとしては、storeをpybottersのwatchで監視して、約定情報があったらnumpyでいろいろいじって、ohlcvを配列で作成してるclassっぽい
main関数に戻る
def log_volume(bar: AbstractTimeBar):
"""最新足のbuy/sell volumeのログを計算するcallback"""
d = dict()
d["bv_log"] = np.log1p(bar.bv)
d["sv_log"] = np.log1p(bar.sv)
return d
ここの引数barとはなんぞやっていうのを今まで調べてた
np.log1pとはなにか調べる
つまり、引数で与えられたbarの最新volumeを対数にしてdict型で保存する関数らしい。(多分)
次にbar_lを見ていく
bar_l = BitflyerTimeBar(
unit_seconds=args.bar_unit_seconds_long,
store=store.executions,
maxlen=args.bar_maxlen,
callbacks=[log_volume],
)
引数はargsで指定したもの、storeの中のもの、さっきのlog_volumeって感じ
BitflyerTimeBarをみていく
class BitflyerTimeBar(AbstractTimeBar):
"""bitFlyer用のTimeBar実装"""
def __init__(
self,
store: pybotters.models.bitflyer.Executions,
unit_seconds: int,
maxlen: int = 9999,
callbacks: list[Callable[[BitflyerTimeBar], None]] = (),
):
super(BitflyerTimeBar, self).__init__(store, unit_seconds, maxlen, callbacks)
def update(self, e: dict):
cur_ts = pd.to_datetime(e["exec_date"]).floor(self._rule)
if len(self._timestamp) == 0:
last_ts = None
else:
last_ts = self._timestamp[-1].floor(f"{self._rule}")
if last_ts is None or last_ts != cur_ts:
self.new_bar(e["price"], e["size"], e["side"], cur_ts)
else:
self.update_cur_bar(e["price"], e["size"], e["side"])
AbstractTimeBarの子クラス。初期処理は省略。
updateはオーバーライドされていて、bitflyerのwebsocketで返ってくる約定データを加工してbarを作成してるっぽい
bar_sに関しては多分ほどんど同じだから省略
ここまでは様々な方法を用いてnumpyで扱えるローソク足を作成しているとみた。
基本的にnumpyの配列とかあまり使ったことなかったから勉強になるなぁ
まだ慣れないけど、使えるようになったら計算も早く行えるしdfより便利そう
本業にもどるので今日はここまで。